Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.

Commit 8c2199d

Browse files
authored
Snapshot restoration overhaul (#11219)
* Comments and todos Use `snapshot_sync` as logging target * fix compilation * More todos, more logs * Fix picking snapshot peer: prefer the one with the highest block number More docs, comments, todos * Adjust WAIT_PEERS_TIMEOUT to be a multiple of MAINTAIN_SYNC_TIMER to try to fix snapshot startup problems Docs, todos, comments * Tabs * Formatting * Don't build new rlp::EMPTY_LIST_RLP instances * Dial down debug logging * Don't warn about missing hashes in the manifest: it's normal Log client version on peer connect * Cleanup * Do not skip snapshots further away than 30k block from the highest block seen Currently we look for peers that seed snapshots that are close to the highest block seen on the network (where "close" means withing 30k blocks). When a node starts up we wait for some time (5sec, increased here to 10sec) to let peers connect and if we have found a suitable peer to sync a snapshot from at the end of that delay, we start the download; if none is found and --warp-barrier is used we stall, otherwise we start a slow-sync. When looking for a suitable snapshot, we use the highest block seen on the network to check if a peer has a snapshot that is within 30k blocks of that highest block number. This means that in a situation where all available snapshots are older than that, we will often fail to start a snapshot at all. What's worse is that the longer we delay starting a snapshot sync (to let more peers connect, in the hope of finding a good snapshot), the more likely we are to have seen a high block and thus the more likely we become to accept a snapshot. This commit removes this comparison with the highest blocknumber criteria entirely and picks the best snapshot we find in 10sec. * lockfile * Add a `ChunkType::Dupe` variant so that we do not disconnect a peer if they happen to send us a duplicate chunk (just ignore the chunk and keep going) Resolve some documentation todos, add more * tweak log message * Don't warp sync twice Check if our own block is beyond the given warp barrier (can happen after we've completed a warp sync but are not quite yet synced up to the tip) and if so, don't sync. More docs, resolve todos. Dial down some `sync` debug level logging to trace * Avoid iterating over all snapshot block/state hashes to find the next work item Use a HashSet instead of a Vec and remove items from the set as chunks are processed. Calculate and store the total number of chunks in the `Snapshot` struct instead of counting pending chunks each time. * Address review grumbles * Log correct number of bytes written to disk * Revert ChunkType::Dup change * whitespace grumble * Cleanup debugging code * Fix docs * Fix import and a typo * Fix test impl * Use `indexmap::IndexSet` to ensure chunk hashes are accessed in order * Revert increased SNAPSHOT_MANIFEST_TIMEOUT: 5sec should be enough
1 parent 6b17e32 commit 8c2199d

File tree

17 files changed

+426
-295
lines changed

17 files changed

+426
-295
lines changed

Cargo.lock

Lines changed: 52 additions & 48 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ethcore/snapshot/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -452,7 +452,6 @@ impl StateRebuilder {
452452
StateDB::commit_bloom(&mut batch, bloom_journal)?;
453453
self.db.inject(&mut batch)?;
454454
backing.write_buffered(batch);
455-
trace!(target: "snapshot", "current state root: {:?}", self.state_root);
456455
Ok(())
457456
}
458457

ethcore/snapshot/src/service.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ impl Restoration {
161161

162162
if let Some(ref mut writer) = self.writer.as_mut() {
163163
writer.write_state_chunk(hash, chunk)?;
164+
trace!(target: "snapshot", "Wrote {}/{} bytes of state to db/disk. Current state root: {:?}", len, chunk.len(), self.state.state_root());
164165
}
165166

166167
self.state_chunks_left.remove(&hash);
@@ -676,7 +677,6 @@ impl<C> Service<C> where C: SnapshotClient + ChainInfo {
676677
} else if manifest.state_hashes.contains(&hash) {
677678
true
678679
} else {
679-
warn!(target: "snapshot", "Hash of the content of {:?} not present in the manifest block/state hashes.", path);
680680
return Ok(false);
681681
};
682682

@@ -788,7 +788,7 @@ impl<C> Service<C> where C: SnapshotClient + ChainInfo {
788788
false => Ok(())
789789
}
790790
}
791-
other => other.map(drop),
791+
Err(e) => Err(e)
792792
};
793793
(res, db)
794794
}

ethcore/snapshot/src/traits.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,7 @@ pub trait SnapshotService : Sync + Send {
5252
fn status(&self) -> RestorationStatus;
5353

5454
/// Begin snapshot restoration.
55-
/// If restoration in-progress, this will reset it.
56-
/// From this point on, any previous snapshot may become unavailable.
55+
/// If a restoration is in progress, this will reset it and clear all data.
5756
fn begin_restore(&self, manifest: ManifestData);
5857

5958
/// Abort an in-progress restoration if there is one.

ethcore/sync/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ ethcore-private-tx = { path = "../private-tx" }
1919
ethereum-types = "0.8.0"
2020
fastmap = { path = "../../util/fastmap" }
2121
futures = "0.1"
22+
indexmap = "1.3.0"
2223
keccak-hash = "0.4.0"
2324
light = { package = "ethcore-light", path = "../light" }
2425
log = "0.4"

ethcore/sync/src/api.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ pub struct EthSync {
295295
light_subprotocol_name: [u8; 3],
296296
/// Priority tasks notification channel
297297
priority_tasks: Mutex<mpsc::Sender<PriorityTask>>,
298-
/// for state tracking
298+
/// Track the sync state: are we importing or verifying blocks?
299299
is_major_syncing: Arc<AtomicBool>
300300
}
301301

ethcore/sync/src/block_sync.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ impl BlockDownloader {
309309
}
310310
}
311311
}
312-
312+
// Update the highest block number seen on the network from the header.
313313
if let Some((number, _)) = last_header {
314314
if self.highest_block.as_ref().map_or(true, |n| number > *n) {
315315
self.highest_block = Some(number);

ethcore/sync/src/chain/handler.rs

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use ethereum_types::{H256, U256};
4343
use keccak_hash::keccak;
4444
use network::PeerId;
4545
use network::client_version::ClientVersion;
46-
use log::{debug, trace, error};
46+
use log::{debug, trace, error, warn};
4747
use rlp::Rlp;
4848
use common_types::{
4949
BlockNumber,
@@ -76,14 +76,14 @@ impl SyncHandler {
7676
SignedPrivateTransactionPacket => SyncHandler::on_signed_private_transaction(sync, io, peer, &rlp),
7777
PrivateStatePacket => SyncHandler::on_private_state_data(sync, io, peer, &rlp),
7878
_ => {
79-
debug!(target: "sync", "{}: Unknown packet {}", peer, packet_id.id());
79+
trace!(target: "sync", "{}: Unknown packet {}", peer, packet_id.id());
8080
Ok(())
8181
}
8282
};
8383

8484
match result {
8585
Err(DownloaderImportError::Invalid) => {
86-
debug!(target:"sync", "{} -> Invalid packet {}", peer, packet_id.id());
86+
trace!(target:"sync", "{} -> Invalid packet {}", peer, packet_id.id());
8787
io.disable_peer(peer);
8888
sync.deactivate_peer(io, peer);
8989
},
@@ -96,7 +96,7 @@ impl SyncHandler {
9696
},
9797
}
9898
} else {
99-
debug!(target: "sync", "{}: Unknown packet {}", peer, packet_id);
99+
trace!(target: "sync", "{}: Unknown packet {}", peer, packet_id);
100100
}
101101
}
102102

@@ -117,14 +117,14 @@ impl SyncHandler {
117117
sync.active_peers.remove(&peer_id);
118118

119119
if sync.state == SyncState::SnapshotManifest {
120-
// Check if we are asking other peers for
121-
// the snapshot manifest as well.
122-
// If not, return to initial state
123-
let still_asking_manifest = sync.peers.iter()
120+
// Check if we are asking other peers for a snapshot manifest as well. If not,
121+
// set our state to initial state (`Idle` or `WaitingPeers`).
122+
let still_seeking_manifest = sync.peers.iter()
124123
.filter(|&(id, p)| sync.active_peers.contains(id) && p.asking == PeerAsking::SnapshotManifest)
125-
.next().is_none();
124+
.next().is_some();
126125

127-
if still_asking_manifest {
126+
if !still_seeking_manifest {
127+
warn!(target: "snapshot_sync", "The peer we were downloading a snapshot from ({}) went away. Retrying.", peer_id);
128128
sync.state = ChainSync::get_init_state(sync.warp_sync, io.chain());
129129
}
130130
}
@@ -371,18 +371,18 @@ impl SyncHandler {
371371
let block_set = sync.peers.get(&peer_id).and_then(|p| p.block_set).unwrap_or(BlockSet::NewBlocks);
372372

373373
if !sync.reset_peer_asking(peer_id, PeerAsking::BlockHeaders) {
374-
debug!(target: "sync", "{}: Ignored unexpected headers", peer_id);
374+
trace!(target: "sync", "{}: Ignored unexpected headers", peer_id);
375375
return Ok(());
376376
}
377377
let expected_hash = match expected_hash {
378378
Some(hash) => hash,
379379
None => {
380-
debug!(target: "sync", "{}: Ignored unexpected headers (expected_hash is None)", peer_id);
380+
trace!(target: "sync", "{}: Ignored unexpected headers (expected_hash is None)", peer_id);
381381
return Ok(());
382382
}
383383
};
384384
if !allowed {
385-
debug!(target: "sync", "{}: Ignored unexpected headers (peer not allowed)", peer_id);
385+
trace!(target: "sync", "{}: Ignored unexpected headers (peer not allowed)", peer_id);
386386
return Ok(());
387387
}
388388

@@ -466,12 +466,12 @@ impl SyncHandler {
466466
/// Called when snapshot manifest is downloaded from a peer.
467467
fn on_snapshot_manifest(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
468468
if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
469-
trace!(target: "sync", "Ignoring snapshot manifest from unconfirmed peer {}", peer_id);
469+
trace!(target: "snapshot_sync", "Ignoring snapshot manifest from unconfirmed peer {}", peer_id);
470470
return Ok(());
471471
}
472472
sync.clear_peer_download(peer_id);
473473
if !sync.reset_peer_asking(peer_id, PeerAsking::SnapshotManifest) || sync.state != SyncState::SnapshotManifest {
474-
trace!(target: "sync", "{}: Ignored unexpected/expired manifest", peer_id);
474+
trace!(target: "snapshot_sync", "{}: Ignored unexpected/expired manifest", peer_id);
475475
return Ok(());
476476
}
477477

@@ -482,10 +482,12 @@ impl SyncHandler {
482482
.map_or(false, |(l, h)| manifest.version >= l && manifest.version <= h);
483483

484484
if !is_supported_version {
485-
trace!(target: "sync", "{}: Snapshot manifest version not supported: {}", peer_id, manifest.version);
485+
warn!(target: "snapshot_sync", "{}: Snapshot manifest version not supported: {}", peer_id, manifest.version);
486486
return Err(DownloaderImportError::Invalid);
487487
}
488488
sync.snapshot.reset_to(&manifest, &keccak(manifest_rlp.as_raw()));
489+
debug!(target: "snapshot_sync", "{}: Peer sent a snapshot manifest we can use. Block number #{}, block chunks: {}, state chunks: {}",
490+
peer_id, manifest.block_number, manifest.block_hashes.len(), manifest.state_hashes.len());
489491
io.snapshot_service().begin_restore(manifest);
490492
sync.state = SyncState::SnapshotData;
491493

@@ -495,56 +497,56 @@ impl SyncHandler {
495497
/// Called when snapshot data is downloaded from a peer.
496498
fn on_snapshot_data(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
497499
if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
498-
trace!(target: "sync", "Ignoring snapshot data from unconfirmed peer {}", peer_id);
500+
trace!(target: "snapshot_sync", "Ignoring snapshot data from unconfirmed peer {}", peer_id);
499501
return Ok(());
500502
}
501503
sync.clear_peer_download(peer_id);
502504
if !sync.reset_peer_asking(peer_id, PeerAsking::SnapshotData) || (sync.state != SyncState::SnapshotData && sync.state != SyncState::SnapshotWaiting) {
503-
trace!(target: "sync", "{}: Ignored unexpected snapshot data", peer_id);
505+
trace!(target: "snapshot_sync", "{}: Ignored unexpected snapshot data", peer_id);
504506
return Ok(());
505507
}
506508

507509
// check service status
508510
let status = io.snapshot_service().status();
509511
match status {
510512
RestorationStatus::Inactive | RestorationStatus::Failed => {
511-
trace!(target: "sync", "{}: Snapshot restoration aborted", peer_id);
513+
trace!(target: "snapshot_sync", "{}: Snapshot restoration status: {:?}", peer_id, status);
512514
sync.state = SyncState::WaitingPeers;
513515

514516
// only note bad if restoration failed.
515517
if let (Some(hash), RestorationStatus::Failed) = (sync.snapshot.snapshot_hash(), status) {
516-
trace!(target: "sync", "Noting snapshot hash {} as bad", hash);
518+
debug!(target: "snapshot_sync", "Marking snapshot manifest hash {} as bad", hash);
517519
sync.snapshot.note_bad(hash);
518520
}
519521

520522
sync.snapshot.clear();
521523
return Ok(());
522524
},
523525
RestorationStatus::Initializing { .. } => {
524-
trace!(target: "warp", "{}: Snapshot restoration is initializing", peer_id);
526+
trace!(target: "snapshot_sync", "{}: Snapshot restoration is initializing. Can't accept data right now.", peer_id);
525527
return Ok(());
526528
}
527529
RestorationStatus::Finalizing => {
528-
trace!(target: "warp", "{}: Snapshot finalizing restoration", peer_id);
530+
trace!(target: "snapshot_sync", "{}: Snapshot finalizing restoration. Can't accept data right now.", peer_id);
529531
return Ok(());
530532
}
531533
RestorationStatus::Ongoing { .. } => {
532-
trace!(target: "sync", "{}: Snapshot restoration is ongoing", peer_id);
534+
trace!(target: "snapshot_sync", "{}: Snapshot restoration is ongoing", peer_id);
533535
},
534536
}
535537

536538
let snapshot_data: Bytes = r.val_at(0)?;
537539
match sync.snapshot.validate_chunk(&snapshot_data) {
538540
Ok(ChunkType::Block(hash)) => {
539-
trace!(target: "sync", "{}: Processing block chunk", peer_id);
541+
trace!(target: "snapshot_sync", "{}: Processing block chunk", peer_id);
540542
io.snapshot_service().restore_block_chunk(hash, snapshot_data);
541543
}
542544
Ok(ChunkType::State(hash)) => {
543-
trace!(target: "sync", "{}: Processing state chunk", peer_id);
545+
trace!(target: "snapshot_sync", "{}: Processing state chunk", peer_id);
544546
io.snapshot_service().restore_state_chunk(hash, snapshot_data);
545547
}
546548
Err(()) => {
547-
trace!(target: "sync", "{}: Got bad snapshot chunk", peer_id);
549+
trace!(target: "snapshot_sync", "{}: Got bad snapshot chunk", peer_id);
548550
io.disconnect_peer(peer_id);
549551
return Ok(());
550552
}
@@ -566,7 +568,7 @@ impl SyncHandler {
566568
let warp_protocol = warp_protocol_version != 0;
567569
let private_tx_protocol = warp_protocol_version >= PAR_PROTOCOL_VERSION_3.0;
568570
let peer = PeerInfo {
569-
protocol_version: protocol_version,
571+
protocol_version,
570572
network_id: r.val_at(1)?,
571573
difficulty: Some(r.val_at(2)?),
572574
latest_hash: r.val_at(3)?,
@@ -595,15 +597,17 @@ impl SyncHandler {
595597
latest:{}, \
596598
genesis:{}, \
597599
snapshot:{:?}, \
598-
private_tx_enabled:{})",
600+
private_tx_enabled:{}, \
601+
client_version: {})",
599602
peer_id,
600603
peer.protocol_version,
601604
peer.network_id,
602605
peer.difficulty,
603606
peer.latest_hash,
604607
peer.genesis,
605608
peer.snapshot_number,
606-
peer.private_tx_enabled
609+
peer.private_tx_enabled,
610+
peer.client_version,
607611
);
608612
if io.is_expired() {
609613
trace!(target: "sync", "Status packet from expired session {}:{}", peer_id, io.peer_version(peer_id));

0 commit comments

Comments
 (0)