Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.

Commit e512bf4

Browse files
dvdplmniklasad1
authored andcommitted
Snapshot restoration overhaul (#11219)
* Comments and todos Use `snapshot_sync` as logging target * fix compilation * More todos, more logs * Fix picking snapshot peer: prefer the one with the highest block number More docs, comments, todos * Adjust WAIT_PEERS_TIMEOUT to be a multiple of MAINTAIN_SYNC_TIMER to try to fix snapshot startup problems Docs, todos, comments * Tabs * Formatting * Don't build new rlp::EMPTY_LIST_RLP instances * Dial down debug logging * Don't warn about missing hashes in the manifest: it's normal Log client version on peer connect * Cleanup * Do not skip snapshots further away than 30k block from the highest block seen Currently we look for peers that seed snapshots that are close to the highest block seen on the network (where "close" means withing 30k blocks). When a node starts up we wait for some time (5sec, increased here to 10sec) to let peers connect and if we have found a suitable peer to sync a snapshot from at the end of that delay, we start the download; if none is found and --warp-barrier is used we stall, otherwise we start a slow-sync. When looking for a suitable snapshot, we use the highest block seen on the network to check if a peer has a snapshot that is within 30k blocks of that highest block number. This means that in a situation where all available snapshots are older than that, we will often fail to start a snapshot at all. What's worse is that the longer we delay starting a snapshot sync (to let more peers connect, in the hope of finding a good snapshot), the more likely we are to have seen a high block and thus the more likely we become to accept a snapshot. This commit removes this comparison with the highest blocknumber criteria entirely and picks the best snapshot we find in 10sec. * lockfile * Add a `ChunkType::Dupe` variant so that we do not disconnect a peer if they happen to send us a duplicate chunk (just ignore the chunk and keep going) Resolve some documentation todos, add more * tweak log message * Don't warp sync twice Check if our own block is beyond the given warp barrier (can happen after we've completed a warp sync but are not quite yet synced up to the tip) and if so, don't sync. More docs, resolve todos. Dial down some `sync` debug level logging to trace * Avoid iterating over all snapshot block/state hashes to find the next work item Use a HashSet instead of a Vec and remove items from the set as chunks are processed. Calculate and store the total number of chunks in the `Snapshot` struct instead of counting pending chunks each time. * Address review grumbles * Log correct number of bytes written to disk * Revert ChunkType::Dup change * whitespace grumble * Cleanup debugging code * Fix docs * Fix import and a typo * Fix test impl * Use `indexmap::IndexSet` to ensure chunk hashes are accessed in order * Revert increased SNAPSHOT_MANIFEST_TIMEOUT: 5sec should be enough
1 parent cd5e43d commit e512bf4

File tree

17 files changed

+793
-296
lines changed

17 files changed

+793
-296
lines changed

Cargo.lock

Lines changed: 43 additions & 39 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ethcore/snapshot/src/traits.rs

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
// Copyright 2015-2019 Parity Technologies (UK) Ltd.
2+
// This file is part of Parity Ethereum.
3+
4+
// Parity Ethereum is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
9+
// Parity Ethereum is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU General Public License for more details.
13+
14+
// You should have received a copy of the GNU General Public License
15+
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
16+
17+
use std::sync::{Arc, atomic::AtomicBool};
18+
19+
use blockchain::{BlockChain, BlockChainDB};
20+
use bytes::Bytes;
21+
use client_traits::{BlockChainClient, BlockInfo, DatabaseRestore, BlockChainReset};
22+
use common_types::{
23+
ids::BlockId,
24+
errors::{EthcoreError as Error, SnapshotError},
25+
snapshot::{ManifestData, ChunkSink, Progress, RestorationStatus},
26+
};
27+
use engine::Engine;
28+
use ethereum_types::H256;
29+
use parking_lot::RwLock;
30+
31+
use crate::io::SnapshotWriter;
32+
33+
/// The interface for a snapshot network service.
34+
/// This handles:
35+
/// - restoration of snapshots to temporary databases.
36+
/// - responding to queries for snapshot manifests and chunks
37+
pub trait SnapshotService : Sync + Send {
38+
/// Query the most recent manifest data.
39+
fn manifest(&self) -> Option<ManifestData>;
40+
41+
/// Get the supported range of snapshot version numbers.
42+
/// `None` indicates warp sync isn't supported by the consensus engine.
43+
fn supported_versions(&self) -> Option<(u64, u64)>;
44+
45+
/// Returns a list of the completed chunks
46+
fn completed_chunks(&self) -> Option<Vec<H256>>;
47+
48+
/// Get raw chunk for a given hash.
49+
fn chunk(&self, hash: H256) -> Option<Bytes>;
50+
51+
/// Ask the snapshot service for the restoration status.
52+
fn status(&self) -> RestorationStatus;
53+
54+
/// Begin snapshot restoration.
55+
/// If a restoration is in progress, this will reset it and clear all data.
56+
fn begin_restore(&self, manifest: ManifestData);
57+
58+
/// Abort an in-progress restoration if there is one.
59+
fn abort_restore(&self);
60+
61+
/// Feed a raw state chunk to the service to be processed asynchronously.
62+
/// no-op if not currently restoring.
63+
fn restore_state_chunk(&self, hash: H256, chunk: Bytes);
64+
65+
/// Feed a raw block chunk to the service to be processed asynchronously.
66+
/// no-op if currently restoring.
67+
fn restore_block_chunk(&self, hash: H256, chunk: Bytes);
68+
69+
/// Abort in-progress snapshotting if there is one.
70+
fn abort_snapshot(&self);
71+
72+
/// Shutdown the Snapshot Service by aborting any ongoing restore
73+
fn shutdown(&self);
74+
}
75+
76+
/// Restore from secondary snapshot chunks.
77+
pub trait Rebuilder: Send {
78+
/// Feed a chunk, potentially out of order.
79+
///
80+
/// Check `abort_flag` periodically while doing heavy work. If set to `false`, should bail with
81+
/// `Error::RestorationAborted`.
82+
fn feed(
83+
&mut self,
84+
chunk: &[u8],
85+
engine: &dyn Engine,
86+
abort_flag: &AtomicBool,
87+
) -> Result<(), Error>;
88+
89+
/// Finalize the restoration. Will be done after all chunks have been
90+
/// fed successfully.
91+
///
92+
/// This should apply the necessary "glue" between chunks,
93+
/// and verify against the restored state.
94+
fn finalize(&mut self) -> Result<(), Error>;
95+
}
96+
97+
/// Components necessary for snapshot creation and restoration.
98+
pub trait SnapshotComponents: Send {
99+
/// Create secondary snapshot chunks; these corroborate the state data
100+
/// in the state chunks.
101+
///
102+
/// Chunks shouldn't exceed the given preferred size, and should be fed
103+
/// uncompressed into the sink.
104+
///
105+
/// This will vary by consensus engine, so it's exposed as a trait.
106+
fn chunk_all(
107+
&mut self,
108+
chain: &BlockChain,
109+
block_at: H256,
110+
chunk_sink: &mut ChunkSink,
111+
progress: &RwLock<Progress>,
112+
preferred_size: usize,
113+
) -> Result<(), SnapshotError>;
114+
115+
/// Create a rebuilder, which will have chunks fed into it in arbitrary
116+
/// order and then be finalized.
117+
///
118+
/// The manifest, a database, and fresh `BlockChain` are supplied.
119+
///
120+
/// The engine passed to the `Rebuilder` methods will be the same instance
121+
/// that created the `SnapshotComponents`.
122+
fn rebuilder(
123+
&self,
124+
chain: BlockChain,
125+
db: Arc<dyn BlockChainDB>,
126+
manifest: &ManifestData,
127+
) -> Result<Box<dyn Rebuilder>, Error>;
128+
129+
/// Minimum supported snapshot version number.
130+
fn min_supported_version(&self) -> u64;
131+
132+
/// Current version number
133+
fn current_version(&self) -> u64;
134+
}
135+
136+
/// Snapshot related functionality
137+
pub trait SnapshotClient: BlockChainClient + BlockInfo + DatabaseRestore + BlockChainReset {
138+
/// Take a snapshot at the given block.
139+
/// If the BlockId is 'Latest', this will default to 1000 blocks behind.
140+
fn take_snapshot<W: SnapshotWriter + Send>(
141+
&self,
142+
writer: W,
143+
at: BlockId,
144+
p: &RwLock<Progress>,
145+
) -> Result<(), Error>;
146+
}
147+
148+
/// Helper trait for broadcasting a block to take a snapshot at.
149+
pub trait Broadcast: Send + Sync {
150+
/// Start a snapshot from the given block number.
151+
fn request_snapshot_at(&self, num: u64);
152+
}
153+
154+
155+
/// Helper trait for transforming hashes to block numbers and checking if syncing.
156+
pub trait Oracle: Send + Sync {
157+
/// Maps a block hash to a block number
158+
fn to_number(&self, hash: H256) -> Option<u64>;
159+
160+
/// Are we currently syncing?
161+
fn is_major_importing(&self) -> bool;
162+
}

ethcore/src/snapshot/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,6 @@ impl StateRebuilder {
479479
StateDB::commit_bloom(&mut batch, bloom_journal)?;
480480
self.db.inject(&mut batch)?;
481481
backing.write_buffered(batch);
482-
trace!(target: "snapshot", "current state root: {:?}", self.state_root);
483482
Ok(())
484483
}
485484

ethcore/src/snapshot/service.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ impl Restoration {
141141

142142
if let Some(ref mut writer) = self.writer.as_mut() {
143143
writer.write_state_chunk(hash, chunk)?;
144+
trace!(target: "snapshot", "Wrote {}/{} bytes of state to db/disk. Current state root: {:?}", len, chunk.len(), self.state.state_root());
144145
}
145146

146147
self.state_chunks_left.remove(&hash);
@@ -767,7 +768,7 @@ impl Service {
767768
false => Ok(())
768769
}
769770
}
770-
other => other.map(drop),
771+
Err(e) => Err(e)
771772
};
772773
(res, db)
773774
}

ethcore/sync/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ hash-db = "0.12.4"
2323
keccak-hash = "0.2.0"
2424
keccak-hasher = { path = "../../util/keccak-hasher" }
2525
kvdb = "0.1"
26+
indexmap = "1.3.0"
2627
log = "0.4"
2728
macros = { path = "../../util/macros" }
2829
parity-bytes = "0.1"

ethcore/sync/src/api.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ pub struct EthSync {
311311
light_subprotocol_name: [u8; 3],
312312
/// Priority tasks notification channel
313313
priority_tasks: Mutex<mpsc::Sender<PriorityTask>>,
314-
/// for state tracking
314+
/// Track the sync state: are we importing or verifying blocks?
315315
is_major_syncing: Arc<AtomicBool>
316316
}
317317

ethcore/sync/src/block_sync.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ impl BlockDownloader {
302302
}
303303
}
304304
}
305-
305+
// Update the highest block number seen on the network from the header.
306306
if let Some((number, _)) = last_header {
307307
if self.highest_block.as_ref().map_or(true, |n| number > *n) {
308308
self.highest_block = Some(number);

0 commit comments

Comments
 (0)