Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.

Commit 9e77e7e

Browse files
authored
Code cleanup in the sync module (#11552)
* Code cleanup `flush_queue()` is mostly used by tests remove some unused params avoid clones in a few places * Consistent params naming Consistent use of log calls * Fix todo It'a actually not trivial to find out the size of these collections, likely we keep blocks from all kinds of forks in the `self.blocks`/`self.parents` collections and there's no good way to anticipate how many blocks we're going to drain. Most of the time we end up draining 0 blocks and then we drain a whole bunch of them, up to 30 000. * Revert making flush_queue() test-only Address review grumbles * More review grumbles * fix build
1 parent 10d82ef commit 9e77e7e

File tree

4 files changed

+82
-75
lines changed

4 files changed

+82
-75
lines changed

ethcore/client-traits/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,8 +197,8 @@ pub trait IoClient: Sync + Send {
197197
/// Queue transactions for importing.
198198
fn queue_transactions(&self, transactions: Vec<Bytes>, peer_id: usize);
199199

200-
/// Queue block import with transaction receipts. Does no sealing and transaction validation.
201-
fn queue_ancient_block(&self, block_bytes: Unverified, receipts_bytes: Bytes) -> EthcoreResult<H256>;
200+
/// Queue block import with transaction receipts. Does no sealing or transaction validation.
201+
fn queue_ancient_block(&self, unverified: Unverified, receipts_bytes: Bytes) -> EthcoreResult<H256>;
202202

203203
/// Queue consensus engine message.
204204
fn queue_consensus_message(&self, message: Bytes);

ethcore/src/client/client.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ struct Importer {
178178
}
179179

180180
/// Blockchain database client backed by a persistent database. Owns and manages a blockchain and a block queue.
181-
/// Call `import_block()` to import a block asynchronously; `flush_queue()` flushes the queue.
181+
/// Call `import_block()` to import a block asynchronously.
182182
pub struct Client {
183183
/// Flag used to disable the client forever. Not to be confused with `liveness`.
184184
///
@@ -870,7 +870,7 @@ impl Client {
870870
*self.on_user_defaults_change.lock() = Some(Box::new(f));
871871
}
872872

873-
/// Flush the block import queue.
873+
/// Flush the block import queue. Used mostly for tests.
874874
pub fn flush_queue(&self) {
875875
self.importer.block_queue.flush();
876876
while !self.importer.block_queue.is_empty() {
@@ -1444,6 +1444,7 @@ impl ImportBlock for Client {
14441444
return Err(EthcoreError::Block(BlockError::UnknownParent(unverified.parent_hash())));
14451445
}
14461446

1447+
// If the queue is empty we propagate the block in a `PriorityTask`.
14471448
let raw = if self.importer.block_queue.is_empty() {
14481449
Some((unverified.bytes.clone(), *unverified.header.difficulty()))
14491450
} else {
@@ -2729,6 +2730,7 @@ impl ImportExportBlocks for Client {
27292730
}
27302731
};
27312732
self.flush_queue();
2733+
27322734
Ok(())
27332735
}
27342736
}

ethcore/sync/src/block_sync.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -407,9 +407,9 @@ impl BlockDownloader {
407407
trace_sync!(self, "Error decoding block receipts RLP: {:?}", e);
408408
BlockDownloaderImportError::Invalid
409409
})?;
410-
receipts.push(receipt.as_raw().to_vec());
410+
receipts.push(receipt.as_raw());
411411
}
412-
let hashes = self.blocks.insert_receipts(receipts);
412+
let hashes = self.blocks.insert_receipts(&receipts);
413413
if hashes.len() != item_count {
414414
trace_sync!(self, "Deactivating peer for giving invalid block receipts");
415415
return Err(BlockDownloaderImportError::Invalid);
@@ -501,15 +501,15 @@ impl BlockDownloader {
501501
MAX_BODIES_TO_REQUEST_SMALL
502502
};
503503

504-
let needed_bodies = self.blocks.needed_bodies(number_of_bodies_to_request, false);
504+
let needed_bodies = self.blocks.needed_bodies(number_of_bodies_to_request);
505505
if !needed_bodies.is_empty() {
506506
return Some(BlockRequest::Bodies {
507507
hashes: needed_bodies,
508508
});
509509
}
510510

511511
if self.download_receipts {
512-
let needed_receipts = self.blocks.needed_receipts(MAX_RECEPITS_TO_REQUEST, false);
512+
let needed_receipts = self.blocks.needed_receipts(MAX_RECEPITS_TO_REQUEST);
513513
if !needed_receipts.is_empty() {
514514
return Some(BlockRequest::Receipts {
515515
hashes: needed_receipts,
@@ -518,7 +518,7 @@ impl BlockDownloader {
518518
}
519519

520520
// find subchain to download
521-
if let Some((h, count)) = self.blocks.needed_headers(MAX_HEADERS_TO_REQUEST, false) {
521+
if let Some((h, count)) = self.blocks.needed_headers(MAX_HEADERS_TO_REQUEST) {
522522
return Some(BlockRequest::Headers {
523523
start: h,
524524
count: count as u64,

ethcore/sync/src/blocks.rs

Lines changed: 71 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::collections::{HashSet, HashMap, hash_map};
1919
use bytes::Bytes;
2020
use ethereum_types::H256;
2121
use keccak_hash::{keccak, KECCAK_NULL_RLP, KECCAK_EMPTY_LIST_RLP};
22-
use log::{trace, warn};
22+
use log::{debug, trace, warn};
2323
use parity_util_mem::MallocSizeOf;
2424
use rlp::{Rlp, RlpStream, DecoderError};
2525
use triehash_ethereum::ordered_trie_root;
@@ -103,7 +103,7 @@ fn unverified_from_sync(header: SyncHeader, body: Option<SyncBody>) -> Unverifie
103103
header: header.header,
104104
transactions: body.transactions,
105105
uncles: body.uncles,
106-
bytes: stream.out().to_vec(),
106+
bytes: stream.out(),
107107
}
108108
}
109109

@@ -196,11 +196,11 @@ impl BlockCollection {
196196
}
197197

198198
/// Insert a collection of block receipts for previously downloaded headers.
199-
pub fn insert_receipts(&mut self, receipts: Vec<Bytes>) -> Vec<Vec<H256>> {
199+
pub fn insert_receipts(&mut self, receipts: &[&[u8]]) -> Vec<Vec<H256>> {
200200
if !self.need_receipts {
201201
return Vec::new();
202202
}
203-
receipts.into_iter()
203+
receipts.iter()
204204
.filter_map(|r| {
205205
self.insert_receipt(r)
206206
.map_err(|e| trace!(target: "sync", "Ignored invalid receipt: {:?}", e))
@@ -210,24 +210,28 @@ impl BlockCollection {
210210
}
211211

212212
/// Returns a set of block hashes that require a body download. The returned set is marked as being downloaded.
213-
pub fn needed_bodies(&mut self, count: usize, _ignore_downloading: bool) -> Vec<H256> {
213+
pub fn needed_bodies(&mut self, count: usize) -> Vec<H256> {
214214
if self.head.is_none() {
215215
return Vec::new();
216216
}
217-
let mut needed_bodies: Vec<H256> = Vec::new();
217+
let mut needed_bodies: Vec<H256> = Vec::with_capacity(count);
218218
let mut head = self.head;
219-
while head.is_some() && needed_bodies.len() < count {
220-
head = self.parents.get(&head.unwrap()).cloned();
221-
if let Some(head) = head {
222-
match self.blocks.get(&head) {
223-
Some(block) if block.body.is_none() && !self.downloading_bodies.contains(&head) => {
224-
self.downloading_bodies.insert(head.clone());
225-
needed_bodies.push(head.clone());
219+
while needed_bodies.len() < count {
220+
head = match head {
221+
Some(head) => {
222+
match self.blocks.get(&head) {
223+
Some(block) if block.body.is_none() && !self.downloading_bodies.contains(&head) => {
224+
self.downloading_bodies.insert(head.clone());
225+
needed_bodies.push(head.clone());
226+
}
227+
_ => (),
226228
}
227-
_ => (),
228-
}
229-
}
229+
self.parents.get(&head).copied()
230+
},
231+
None => break
232+
};
230233
}
234+
231235
for h in self.header_ids.values() {
232236
if needed_bodies.len() >= count {
233237
break;
@@ -241,25 +245,28 @@ impl BlockCollection {
241245
}
242246

243247
/// Returns a set of block hashes that require a receipt download. The returned set is marked as being downloaded.
244-
pub fn needed_receipts(&mut self, count: usize, _ignore_downloading: bool) -> Vec<H256> {
248+
pub fn needed_receipts(&mut self, count: usize) -> Vec<H256> {
245249
if self.head.is_none() || !self.need_receipts {
246250
return Vec::new();
247251
}
248-
let mut needed_receipts: Vec<H256> = Vec::new();
252+
let mut needed_receipts: Vec<H256> = Vec::with_capacity(count);
249253
let mut head = self.head;
250-
while head.is_some() && needed_receipts.len() < count {
251-
head = self.parents.get(&head.unwrap()).cloned();
252-
if let Some(head) = head {
253-
match self.blocks.get(&head) {
254-
Some(block) => {
255-
if block.receipts.is_none() && !self.downloading_receipts.contains(&block.receipts_root) {
256-
self.downloading_receipts.insert(block.receipts_root);
257-
needed_receipts.push(head.clone());
254+
while needed_receipts.len() < count {
255+
head = match head {
256+
Some(head) => {
257+
match self.blocks.get(&head) {
258+
Some(block) => {
259+
if block.receipts.is_none() && !self.downloading_receipts.contains(&block.receipts_root) {
260+
self.downloading_receipts.insert(block.receipts_root);
261+
needed_receipts.push(head);
262+
}
258263
}
264+
_ => (),
259265
}
260-
_ => (),
261-
}
262-
}
266+
self.parents.get(&head).copied()
267+
},
268+
None => break
269+
};
263270
}
264271
// If there are multiple blocks per receipt, only request one of them.
265272
for (root, h) in self.receipt_ids.iter().map(|(root, hashes)| (root, hashes[0])) {
@@ -275,12 +282,12 @@ impl BlockCollection {
275282
}
276283

277284
/// Returns a set of block hashes that require a header download. The returned set is marked as being downloaded.
278-
pub fn needed_headers(&mut self, count: usize, ignore_downloading: bool) -> Option<(H256, usize)> {
285+
pub fn needed_headers(&mut self, count: usize) -> Option<(H256, usize)> {
279286
// find subchain to download
280287
let mut download = None;
281288
{
282289
for h in &self.heads {
283-
if ignore_downloading || !self.downloading_headers.contains(h) {
290+
if !self.downloading_headers.contains(h) {
284291
self.downloading_headers.insert(h.clone());
285292
download = Some(h.clone());
286293
break;
@@ -317,42 +324,40 @@ impl BlockCollection {
317324
return Vec::new();
318325
}
319326

320-
let mut drained = Vec::new();
321327
let mut hashes = Vec::new();
322-
{
323-
let mut blocks = Vec::new();
324-
let mut head = self.head;
325-
while let Some(h) = head {
326-
head = self.parents.get(&h).cloned();
327-
if let Some(head) = head {
328-
match self.blocks.remove(&head) {
329-
Some(block) => {
330-
if block.body.is_some() && (!self.need_receipts || block.receipts.is_some()) {
331-
blocks.push(block);
332-
hashes.push(head);
333-
self.head = Some(head);
334-
} else {
335-
self.blocks.insert(head, block);
336-
break;
337-
}
338-
},
339-
_ => {
328+
let mut blocks = Vec::new();
329+
let mut head = self.head;
330+
while let Some(h) = head {
331+
head = self.parents.get(&h).copied();
332+
if let Some(head) = head {
333+
match self.blocks.remove(&head) {
334+
Some(block) => {
335+
if block.body.is_some() && (!self.need_receipts || block.receipts.is_some()) {
336+
blocks.push(block);
337+
hashes.push(head);
338+
self.head = Some(head);
339+
} else {
340+
self.blocks.insert(head, block);
340341
break;
341-
},
342-
}
342+
}
343+
},
344+
_ => {
345+
break;
346+
},
343347
}
344348
}
349+
}
345350

346-
for block in blocks.into_iter() {
347-
let unverified = unverified_from_sync(block.header, block.body);
348-
drained.push(BlockAndReceipts {
349-
block: unverified,
350-
receipts: block.receipts.clone(),
351-
});
352-
}
351+
let mut drained = Vec::with_capacity(blocks.len());
352+
for block in blocks {
353+
let unverified = unverified_from_sync(block.header, block.body);
354+
drained.push(BlockAndReceipts {
355+
block: unverified,
356+
receipts: block.receipts,
357+
});
353358
}
354359

355-
trace!(target: "sync", "Drained {} blocks, new head :{:?}", drained.len(), self.head);
360+
debug!(target: "sync", "Drained {} blocks, new head :{:?}", drained.len(), self.head);
356361
drained
357362
}
358363

@@ -409,7 +414,7 @@ impl BlockCollection {
409414
}
410415
}
411416

412-
fn insert_receipt(&mut self, r: Bytes) -> Result<Vec<H256>, network::Error> {
417+
fn insert_receipt(&mut self, r: &[u8]) -> Result<Vec<H256>, network::Error> {
413418
let receipt_root = {
414419
let receipts = Rlp::new(&r);
415420
ordered_trie_root(receipts.iter().map(|r| r.as_raw()))
@@ -422,7 +427,7 @@ impl BlockCollection {
422427
match self.blocks.get_mut(&h) {
423428
Some(ref mut block) => {
424429
trace!(target: "sync", "Got receipt {}", h);
425-
block.receipts = Some(r.clone());
430+
block.receipts = Some(r.to_vec());
426431
},
427432
None => {
428433
warn!("Got receipt with no header {}", h);
@@ -581,11 +586,11 @@ mod test {
581586
bc.reset_to(heads);
582587
assert!(!bc.is_empty());
583588
assert_eq!(hashes[0], bc.heads[0]);
584-
assert!(bc.needed_bodies(1, false).is_empty());
589+
assert!(bc.needed_bodies(1).is_empty());
585590
assert!(!bc.contains(&hashes[0]));
586591
assert!(!bc.is_downloading(&hashes[0]));
587592

588-
let (h, n) = bc.needed_headers(6, false).unwrap();
593+
let (h, n) = bc.needed_headers(6).unwrap();
589594
assert!(bc.is_downloading(&hashes[0]));
590595
assert_eq!(hashes[0], h);
591596
assert_eq!(n, 6);
@@ -608,9 +613,9 @@ mod test {
608613
assert!(!bc.contains(&hashes[0]));
609614
assert_eq!(hashes[5], bc.head.unwrap());
610615

611-
let (h, _) = bc.needed_headers(6, false).unwrap();
616+
let (h, _) = bc.needed_headers(6).unwrap();
612617
assert_eq!(hashes[5], h);
613-
let (h, _) = bc.needed_headers(6, false).unwrap();
618+
let (h, _) = bc.needed_headers(6).unwrap();
614619
assert_eq!(hashes[20], h);
615620
bc.insert_headers(headers[10..16].into_iter().map(Clone::clone).collect());
616621
assert!(bc.drain().is_empty());

0 commit comments

Comments
 (0)