Merge pull request #5019 from paritytech/snapshot-v2

Fine grained snapshot chunking
This commit is contained in:
Robert Habermeier 2017-03-27 16:57:02 +02:00 committed by GitHub
commit 1ca1a4b1cc
17 changed files with 180 additions and 84 deletions

1
Cargo.lock generated
View File

@ -414,6 +414,7 @@ dependencies = [
"evmjit 1.7.0",
"hardware-wallet 1.7.0",
"hyper 0.10.0-a.0 (git+https://github.com/paritytech/hyper)",
"itertools 0.5.9 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"linked-hash-map 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -27,6 +27,7 @@ byteorder = "1.0"
transient-hashmap = "0.4"
linked-hash-map = "0.3.0"
lru-cache = "0.1.0"
itertools = "0.5"
ethabi = "1.0.0"
evmjit = { path = "../evmjit", optional = true }
clippy = { version = "0.0.103", optional = true}

View File

@ -108,6 +108,7 @@ extern crate hardware_wallet;
extern crate stats;
extern crate ethcore_logger;
extern crate num;
extern crate itertools;
#[macro_use]
extern crate log;

View File

@ -23,6 +23,7 @@ use snapshot::Error;
use util::{U256, H256, Bytes, HashDB, SHA3_EMPTY, SHA3_NULL_RLP};
use util::trie::{TrieDB, Trie};
use rlp::{RlpStream, UntrustedRlp};
use itertools::Itertools;
use std::collections::HashSet;
@ -60,55 +61,53 @@ impl CodeState {
}
}
// walk the account's storage trie, returning an RLP item containing the
// account properties and the storage.
pub fn to_fat_rlp(acc: &BasicAccount, acct_db: &AccountDB, used_code: &mut HashSet<H256>) -> Result<Bytes, Error> {
// walk the account's storage trie, returning a vector of RLP items containing the
// account properties and the storage. Each item contains at most `max_storage_items`
// storage records split according to snapshot format definition.
pub fn to_fat_rlps(acc: &BasicAccount, acct_db: &AccountDB, used_code: &mut HashSet<H256>, max_storage_items: usize) -> Result<Vec<Bytes>, Error> {
if acc == &ACC_EMPTY {
return Ok(::rlp::NULL_RLP.to_vec());
return Ok(vec![::rlp::NULL_RLP.to_vec()]);
}
let db = TrieDB::new(acct_db, &acc.storage_root)?;
let mut pairs = Vec::new();
let chunks = db.iter()?.chunks(max_storage_items);
let pair_chunks = chunks.into_iter().map(|chunk| chunk.collect());
pair_chunks.pad_using(1, |_| Vec::new(), ).map(|pairs| {
let mut stream = RlpStream::new_list(pairs.len());
for item in db.iter()? {
let (k, v) = item?;
pairs.push((k, v));
}
for r in pairs {
let (k, v) = r?;
stream.begin_list(2).append(&k).append(&&*v);
}
let mut stream = RlpStream::new_list(pairs.len());
let pairs_rlp = stream.out();
for (k, v) in pairs {
stream.begin_list(2).append(&k).append(&&*v);
}
let mut account_stream = RlpStream::new_list(5);
account_stream.append(&acc.nonce)
.append(&acc.balance);
let pairs_rlp = stream.out();
let mut account_stream = RlpStream::new_list(5);
account_stream.append(&acc.nonce)
.append(&acc.balance);
// [has_code, code_hash].
if acc.code_hash == SHA3_EMPTY {
account_stream.append(&CodeState::Empty.raw()).append_empty_data();
} else if used_code.contains(&acc.code_hash) {
account_stream.append(&CodeState::Hash.raw()).append(&acc.code_hash);
} else {
match acct_db.get(&acc.code_hash) {
Some(c) => {
used_code.insert(acc.code_hash.clone());
account_stream.append(&CodeState::Inline.raw()).append(&&*c);
}
None => {
warn!("code lookup failed during snapshot");
account_stream.append(&false).append_empty_data();
// [has_code, code_hash].
if acc.code_hash == SHA3_EMPTY {
account_stream.append(&CodeState::Empty.raw()).append_empty_data();
} else if used_code.contains(&acc.code_hash) {
account_stream.append(&CodeState::Hash.raw()).append(&acc.code_hash);
} else {
match acct_db.get(&acc.code_hash) {
Some(c) => {
used_code.insert(acc.code_hash.clone());
account_stream.append(&CodeState::Inline.raw()).append(&&*c);
}
None => {
warn!("code lookup failed during snapshot");
account_stream.append(&false).append_empty_data();
}
}
}
}
account_stream.append_raw(&pairs_rlp, 1);
Ok(account_stream.out())
account_stream.append_raw(&pairs_rlp, 1);
Ok(account_stream.out())
}).collect()
}
// decode a fat rlp, and rebuild the storage trie as we go.
@ -117,6 +116,7 @@ pub fn to_fat_rlp(acc: &BasicAccount, acct_db: &AccountDB, used_code: &mut HashS
pub fn from_fat_rlp(
acct_db: &mut AccountDBMut,
rlp: UntrustedRlp,
mut storage_root: H256,
) -> Result<(BasicAccount, Option<Bytes>), Error> {
use util::{TrieDBMut, TrieMut};
@ -148,10 +148,12 @@ pub fn from_fat_rlp(
}
};
let mut storage_root = H256::zero();
{
let mut storage_trie = TrieDBMut::new(acct_db, &mut storage_root);
let mut storage_trie = if storage_root.is_zero() {
TrieDBMut::new(acct_db, &mut storage_root)
} else {
TrieDBMut::from_existing(acct_db, &mut storage_root)?
};
let pairs = rlp.at(4)?;
for pair_rlp in pairs.iter() {
let k: Bytes = pair_rlp.val_at(0)?;
@ -184,7 +186,7 @@ mod tests {
use std::collections::HashSet;
use super::{ACC_EMPTY, to_fat_rlp, from_fat_rlp};
use super::{ACC_EMPTY, to_fat_rlps, from_fat_rlp};
#[test]
fn encoding_basic() {
@ -201,9 +203,9 @@ mod tests {
let thin_rlp = ::rlp::encode(&account);
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp), account);
let fat_rlp = to_fat_rlp(&account, &AccountDB::new(db.as_hashdb(), &addr), &mut Default::default()).unwrap();
let fat_rlp = UntrustedRlp::new(&fat_rlp);
assert_eq!(from_fat_rlp(&mut AccountDBMut::new(db.as_hashdb_mut(), &addr), fat_rlp).unwrap().0, account);
let fat_rlps = to_fat_rlps(&account, &AccountDB::new(db.as_hashdb(), &addr), &mut Default::default(), usize::max_value()).unwrap();
let fat_rlp = UntrustedRlp::new(&fat_rlps[0]);
assert_eq!(from_fat_rlp(&mut AccountDBMut::new(db.as_hashdb_mut(), &addr), fat_rlp, H256::zero()).unwrap().0, account);
}
#[test]
@ -226,9 +228,40 @@ mod tests {
let thin_rlp = ::rlp::encode(&account);
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp), account);
let fat_rlp = to_fat_rlp(&account, &AccountDB::new(db.as_hashdb(), &addr), &mut Default::default()).unwrap();
let fat_rlp = UntrustedRlp::new(&fat_rlp);
assert_eq!(from_fat_rlp(&mut AccountDBMut::new(db.as_hashdb_mut(), &addr), fat_rlp).unwrap().0, account);
let fat_rlp = to_fat_rlps(&account, &AccountDB::new(db.as_hashdb(), &addr), &mut Default::default(), usize::max_value()).unwrap();
let fat_rlp = UntrustedRlp::new(&fat_rlp[0]);
assert_eq!(from_fat_rlp(&mut AccountDBMut::new(db.as_hashdb_mut(), &addr), fat_rlp, H256::zero()).unwrap().0, account);
}
#[test]
fn encoding_storage_split() {
let mut db = get_temp_state_db();
let addr = Address::random();
let account = {
let acct_db = AccountDBMut::new(db.as_hashdb_mut(), &addr);
let mut root = SHA3_NULL_RLP;
fill_storage(acct_db, &mut root, &mut H256::zero());
BasicAccount {
nonce: 25.into(),
balance: 987654321.into(),
storage_root: root,
code_hash: SHA3_EMPTY,
}
};
let thin_rlp = ::rlp::encode(&account);
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp), account);
let fat_rlps = to_fat_rlps(&account, &AccountDB::new(db.as_hashdb(), &addr), &mut Default::default(), 100).unwrap();
let mut root = SHA3_NULL_RLP;
let mut restored_account = None;
for rlp in fat_rlps {
let fat_rlp = UntrustedRlp::new(&rlp);
restored_account = Some(from_fat_rlp(&mut AccountDBMut::new(db.as_hashdb_mut(), &addr), fat_rlp, root).unwrap().0);
root = restored_account.as_ref().unwrap().storage_root.clone();
}
assert_eq!(restored_account, Some(account));
}
#[test]
@ -264,18 +297,18 @@ mod tests {
let mut used_code = HashSet::new();
let fat_rlp1 = to_fat_rlp(&account1, &AccountDB::new(db.as_hashdb(), &addr1), &mut used_code).unwrap();
let fat_rlp2 = to_fat_rlp(&account2, &AccountDB::new(db.as_hashdb(), &addr2), &mut used_code).unwrap();
let fat_rlp1 = to_fat_rlps(&account1, &AccountDB::new(db.as_hashdb(), &addr1), &mut used_code, usize::max_value()).unwrap();
let fat_rlp2 = to_fat_rlps(&account2, &AccountDB::new(db.as_hashdb(), &addr2), &mut used_code, usize::max_value()).unwrap();
assert_eq!(used_code.len(), 1);
let fat_rlp1 = UntrustedRlp::new(&fat_rlp1);
let fat_rlp2 = UntrustedRlp::new(&fat_rlp2);
let fat_rlp1 = UntrustedRlp::new(&fat_rlp1[0]);
let fat_rlp2 = UntrustedRlp::new(&fat_rlp2[0]);
let (acc, maybe_code) = from_fat_rlp(&mut AccountDBMut::new(db.as_hashdb_mut(), &addr2), fat_rlp2).unwrap();
let (acc, maybe_code) = from_fat_rlp(&mut AccountDBMut::new(db.as_hashdb_mut(), &addr2), fat_rlp2, H256::zero()).unwrap();
assert!(maybe_code.is_none());
assert_eq!(acc, account2);
let (acc, maybe_code) = from_fat_rlp(&mut AccountDBMut::new(db.as_hashdb_mut(), &addr1), fat_rlp1).unwrap();
let (acc, maybe_code) = from_fat_rlp(&mut AccountDBMut::new(db.as_hashdb_mut(), &addr1), fat_rlp1, H256::zero()).unwrap();
assert_eq!(maybe_code, Some(b"this is definitely code".to_vec()));
assert_eq!(acc, account1);
}
@ -285,7 +318,7 @@ mod tests {
let mut db = get_temp_state_db();
let mut used_code = HashSet::new();
assert_eq!(to_fat_rlp(&ACC_EMPTY, &AccountDB::new(db.as_hashdb(), &Address::default()), &mut used_code).unwrap(), ::rlp::NULL_RLP.to_vec());
assert_eq!(from_fat_rlp(&mut AccountDBMut::new(db.as_hashdb_mut(), &Address::default()), UntrustedRlp::new(&::rlp::NULL_RLP)).unwrap(), (ACC_EMPTY, None));
assert_eq!(to_fat_rlps(&ACC_EMPTY, &AccountDB::new(db.as_hashdb(), &Address::default()), &mut used_code, usize::max_value()).unwrap(), vec![::rlp::NULL_RLP.to_vec()]);
assert_eq!(from_fat_rlp(&mut AccountDBMut::new(db.as_hashdb_mut(), &Address::default()), UntrustedRlp::new(&::rlp::NULL_RLP), H256::zero()).unwrap(), (ACC_EMPTY, None));
}
}

View File

@ -53,6 +53,8 @@ pub enum Error {
Decoder(DecoderError),
/// Io error.
Io(::std::io::Error),
/// Snapshot version is not supported.
VersionNotSupported(u64),
}
impl fmt::Display for Error {
@ -73,6 +75,7 @@ impl fmt::Display for Error {
Error::Io(ref err) => err.fmt(f),
Error::Decoder(ref err) => err.fmt(f),
Error::Trie(ref err) => err.fmt(f),
Error::VersionNotSupported(ref ver) => write!(f, "Snapshot version {} is not supprted.", ver),
}
}
}

View File

@ -31,6 +31,8 @@ use rlp::{self, Encodable, RlpStream, UntrustedRlp};
use super::ManifestData;
const SNAPSHOT_VERSION: u64 = 2;
/// Something which can write snapshots.
/// Writing the same chunk multiple times will lead to implementation-defined
/// behavior, and is not advised.
@ -118,8 +120,9 @@ impl SnapshotWriter for PackedWriter {
fn finish(mut self, manifest: ManifestData) -> io::Result<()> {
// we ignore the hashes fields of the manifest under the assumption that
// they are consistent with ours.
let mut stream = RlpStream::new_list(5);
let mut stream = RlpStream::new_list(6);
stream
.append(&SNAPSHOT_VERSION)
.append_list(&self.state_hashes)
.append_list(&self.block_hashes)
.append(&manifest.state_root)
@ -221,7 +224,7 @@ impl PackedReader {
/// Create a new `PackedReader` for the file at the given path.
/// This will fail if any io errors are encountered or the file
/// is not a valid packed snapshot.
pub fn new(path: &Path) -> Result<Option<Self>, ::error::Error> {
pub fn new(path: &Path) -> Result<Option<Self>, ::snapshot::error::Error> {
let mut file = File::open(path)?;
let file_len = file.metadata()?.len();
if file_len < 8 {
@ -255,15 +258,26 @@ impl PackedReader {
let rlp = UntrustedRlp::new(&manifest_buf);
let state: Vec<ChunkInfo> = rlp.list_at(0)?;
let blocks: Vec<ChunkInfo> = rlp.list_at(1)?;
let (start, version) = if rlp.item_count()? == 5 {
(0, 1)
} else {
(1, rlp.val_at(0)?)
};
if version > SNAPSHOT_VERSION {
return Err(::snapshot::error::Error::VersionNotSupported(version));
}
let state: Vec<ChunkInfo> = rlp.list_at(0 + start)?;
let blocks: Vec<ChunkInfo> = rlp.list_at(1 + start)?;
let manifest = ManifestData {
version: version,
state_hashes: state.iter().map(|c| c.0).collect(),
block_hashes: blocks.iter().map(|c| c.0).collect(),
state_root: rlp.val_at(2)?,
block_number: rlp.val_at(3)?,
block_hash: rlp.val_at(4)?,
state_root: rlp.val_at(2 + start)?,
block_number: rlp.val_at(3 + start)?,
block_hash: rlp.val_at(4 + start)?,
};
Ok(Some(PackedReader {
@ -346,7 +360,7 @@ mod tests {
use util::sha3::Hashable;
use snapshot::ManifestData;
use super::{SnapshotWriter, SnapshotReader, PackedWriter, PackedReader, LooseWriter, LooseReader};
use super::{SnapshotWriter, SnapshotReader, PackedWriter, PackedReader, LooseWriter, LooseReader, SNAPSHOT_VERSION};
const STATE_CHUNKS: &'static [&'static [u8]] = &[b"dog", b"cat", b"hello world", b"hi", b"notarealchunk"];
const BLOCK_CHUNKS: &'static [&'static [u8]] = &[b"hello!", b"goodbye!", b"abcdefg", b"hijklmnop", b"qrstuvwxy", b"and", b"z"];
@ -372,6 +386,7 @@ mod tests {
}
let manifest = ManifestData {
version: SNAPSHOT_VERSION,
state_hashes: state_hashes,
block_hashes: block_hashes,
state_root: b"notarealroot".sha3(),
@ -410,6 +425,7 @@ mod tests {
}
let manifest = ManifestData {
version: SNAPSHOT_VERSION,
state_hashes: state_hashes,
block_hashes: block_hashes,
state_root: b"notarealroot".sha3(),

View File

@ -56,6 +56,7 @@ pub use self::traits::SnapshotService;
pub use self::watcher::Watcher;
pub use types::snapshot_manifest::ManifestData;
pub use types::restoration_status::RestorationStatus;
pub use types::basic_account::BasicAccount;
pub mod io;
pub mod service;
@ -82,6 +83,9 @@ mod traits {
// Try to have chunks be around 4MB (before compression)
const PREFERRED_CHUNK_SIZE: usize = 4 * 1024 * 1024;
// Try to have chunks be around 4MB (before compression)
const MAX_STORAGE_ENTRIES_PER_ACCOUNT_RECORD: usize = 80_000;
// How many blocks to include in a snapshot, starting from the head of the chain.
const SNAPSHOT_BLOCKS: u64 = 30000;
@ -147,6 +151,7 @@ pub fn take_snapshot<W: SnapshotWriter + Send>(
info!("produced {} state chunks and {} block chunks.", state_hashes.len(), block_hashes.len());
let manifest_data = ManifestData {
version: 2,
state_hashes: state_hashes,
block_hashes: block_hashes,
state_root: *state_root,
@ -300,14 +305,14 @@ impl<'a> StateChunker<'a> {
//
// If the buffer is greater than the desired chunk size,
// this will write out the data to disk.
fn push(&mut self, account_hash: Bytes, data: Bytes) -> Result<(), Error> {
fn push(&mut self, account_hash: Bytes, data: Bytes, force_chunk: bool) -> Result<(), Error> {
let pair = {
let mut stream = RlpStream::new_list(2);
stream.append(&account_hash).append_raw(&data, 1);
stream.out()
};
if self.cur_size + pair.len() >= PREFERRED_CHUNK_SIZE {
if force_chunk || self.cur_size + pair.len() >= PREFERRED_CHUNK_SIZE {
self.write_chunk()?;
}
@ -372,8 +377,10 @@ pub fn chunk_state<'a>(db: &HashDB, root: &H256, writer: &Mutex<SnapshotWriter +
let account_db = AccountDB::from_hash(db, account_key_hash);
let fat_rlp = account::to_fat_rlp(&account, &account_db, &mut used_code)?;
chunker.push(account_key, fat_rlp)?;
let fat_rlps = account::to_fat_rlps(&account, &account_db, &mut used_code, MAX_STORAGE_ENTRIES_PER_ACCOUNT_RECORD)?;
for (i, fat_rlp) in fat_rlps.into_iter().enumerate() {
chunker.push(account_key.clone(), fat_rlp, i > 0)?;
}
}
if chunker.cur_size != 0 {
@ -390,6 +397,7 @@ pub struct StateRebuilder {
known_code: HashMap<H256, H256>, // code hashes mapped to first account with this code.
missing_code: HashMap<H256, Vec<H256>>, // maps code hashes to lists of accounts missing that code.
bloom: Bloom,
known_storage_roots: HashMap<H256, H256>, // maps account hashes to last known storage root. Only filled for last account per chunk.
}
impl StateRebuilder {
@ -401,6 +409,7 @@ impl StateRebuilder {
known_code: HashMap::new(),
missing_code: HashMap::new(),
bloom: StateDB::load_bloom(&*db),
known_storage_roots: HashMap::new(),
}
}
@ -418,6 +427,7 @@ impl StateRebuilder {
rlp,
&mut pairs,
&self.known_code,
&mut self.known_storage_roots,
flag
)?;
@ -496,10 +506,11 @@ fn rebuild_accounts(
account_fat_rlps: UntrustedRlp,
out_chunk: &mut [(H256, Bytes)],
known_code: &HashMap<H256, H256>,
known_storage_roots: &mut HashMap<H256, H256>,
abort_flag: &AtomicBool,
) -> Result<RebuiltStatus, ::error::Error> {
let mut status = RebuiltStatus::default();
for (account_rlp, out) in account_fat_rlps.into_iter().zip(out_chunk) {
for (account_rlp, out) in account_fat_rlps.into_iter().zip(out_chunk.iter_mut()) {
if !abort_flag.load(Ordering::SeqCst) { return Err(Error::RestorationAborted.into()) }
let hash: H256 = account_rlp.val_at(0)?;
@ -510,7 +521,8 @@ fn rebuild_accounts(
// fill out the storage trie and code while decoding.
let (acc, maybe_code) = {
let mut acct_db = AccountDBMut::from_hash(db, hash);
account::from_fat_rlp(&mut acct_db, fat_rlp)?
let storage_root = known_storage_roots.get(&hash).cloned().unwrap_or(H256::zero());
account::from_fat_rlp(&mut acct_db, fat_rlp, storage_root)?
};
let code_hash = acc.code_hash.clone();
@ -542,6 +554,12 @@ fn rebuild_accounts(
*out = (hash, thin_rlp);
}
if let Some(&(ref hash, ref rlp)) = out_chunk.iter().last() {
known_storage_roots.insert(*hash, ::rlp::decode::<BasicAccount>(rlp).storage_root);
}
if let Some(&(ref hash, ref rlp)) = out_chunk.iter().next() {
known_storage_roots.insert(*hash, ::rlp::decode::<BasicAccount>(rlp).storage_root);
}
Ok(status)
}

View File

@ -656,6 +656,7 @@ mod tests {
assert_eq!(service.status(), RestorationStatus::Inactive);
let manifest = ManifestData {
version: 2,
state_hashes: vec![],
block_hashes: vec![],
state_root: Default::default(),

View File

@ -63,6 +63,7 @@ fn chunk_and_restore(amount: u64) {
let writer = Mutex::new(PackedWriter::new(&snapshot_path).unwrap());
let block_hashes = chunk_blocks(&bc, best_hash, &writer, &Progress::default()).unwrap();
let manifest = ::snapshot::ManifestData {
version: 2,
state_hashes: Vec::new(),
block_hashes: block_hashes,
state_root: ::util::sha3::SHA3_NULL_RLP,
@ -125,6 +126,7 @@ fn checks_flag() {
let chain = BlockChain::new(Default::default(), &genesis, db.clone());
let manifest = ::snapshot::ManifestData {
version: 2,
state_hashes: Vec::new(),
block_hashes: Vec::new(),
state_root: ::util::sha3::SHA3_NULL_RLP,

View File

@ -27,6 +27,7 @@ use super::ManifestData;
#[test]
fn manifest_rlp() {
let manifest = ManifestData {
version: 2,
block_hashes: Vec::new(),
state_hashes: Vec::new(),
block_number: 1234567,

View File

@ -122,6 +122,7 @@ fn guards_delete_folders() {
path.push("restoration");
let manifest = ManifestData {
version: 2,
state_hashes: vec![],
block_hashes: vec![],
block_number: 0,

View File

@ -58,6 +58,7 @@ fn snap_and_restore() {
let state_hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default()).unwrap();
writer.into_inner().finish(::snapshot::ManifestData {
version: 2,
state_hashes: state_hashes,
block_hashes: Vec::new(),
state_root: state_root,
@ -121,10 +122,10 @@ fn get_code_from_prev_chunk() {
let mut db = MemoryDB::new();
AccountDBMut::from_hash(&mut db, hash).insert(&code[..]);
let fat_rlp = account::to_fat_rlp(&acc, &AccountDB::from_hash(&db, hash), &mut used_code).unwrap();
let fat_rlp = account::to_fat_rlps(&acc, &AccountDB::from_hash(&db, hash), &mut used_code, usize::max_value()).unwrap();
let mut stream = RlpStream::new_list(1);
stream.begin_list(2).append(&hash).append_raw(&fat_rlp, 1);
stream.begin_list(2).append(&hash).append_raw(&fat_rlp[0], 1);
stream.out()
};
@ -170,6 +171,7 @@ fn checks_flag() {
let state_hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default()).unwrap();
writer.into_inner().finish(::snapshot::ManifestData {
version: 2,
state_hashes: state_hashes,
block_hashes: Vec::new(),
state_root: state_root,

View File

@ -24,6 +24,8 @@ use util::Bytes;
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "ipc", binary)]
pub struct ManifestData {
/// Snapshot format version.
pub version: u64,
/// List of state chunk hashes.
pub state_hashes: Vec<H256>,
/// List of block chunk hashes.
@ -39,7 +41,8 @@ pub struct ManifestData {
impl ManifestData {
/// Encode the manifest data to rlp.
pub fn into_rlp(self) -> Bytes {
let mut stream = RlpStream::new_list(5);
let mut stream = RlpStream::new_list(6);
stream.append(&self.version);
stream.append_list(&self.state_hashes);
stream.append_list(&self.block_hashes);
stream.append(&self.state_root);
@ -52,14 +55,20 @@ impl ManifestData {
/// Try to restore manifest data from raw bytes, interpreted as RLP.
pub fn from_rlp(raw: &[u8]) -> Result<Self, DecoderError> {
let decoder = UntrustedRlp::new(raw);
let (start, version) = if decoder.item_count()? == 5 {
(0, 1)
} else {
(1, decoder.val_at(0)?)
};
let state_hashes: Vec<H256> = decoder.list_at(0)?;
let block_hashes: Vec<H256> = decoder.list_at(1)?;
let state_root: H256 = decoder.val_at(2)?;
let block_number: u64 = decoder.val_at(3)?;
let block_hash: H256 = decoder.val_at(4)?;
let state_hashes: Vec<H256> = decoder.list_at(start + 0)?;
let block_hashes: Vec<H256> = decoder.list_at(start + 1)?;
let state_root: H256 = decoder.val_at(start + 2)?;
let block_number: u64 = decoder.val_at(start + 3)?;
let block_hash: H256 = decoder.val_at(start + 4)?;
Ok(ManifestData {
version: version,
state_hashes: state_hashes,
block_hashes: block_hashes,
state_root: state_root,

View File

@ -158,6 +158,8 @@ pub const SNAPSHOT_SYNC_PACKET_COUNT: u8 = 0x16;
const MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD: usize = 3;
const MIN_SUPPORTED_SNAPSHOT_MANIFEST_VERSION: u64 = 1;
const WAIT_PEERS_TIMEOUT_SEC: u64 = 5;
const STATUS_TIMEOUT_SEC: u64 = 5;
const HEADERS_TIMEOUT_SEC: u64 = 15;
@ -1028,12 +1030,18 @@ impl ChainSync {
let manifest = match ManifestData::from_rlp(manifest_rlp.as_raw()) {
Err(e) => {
trace!(target: "sync", "{}: Ignored bad manifest: {:?}", peer_id, e);
io.disconnect_peer(peer_id);
io.disable_peer(peer_id);
self.continue_sync(io);
return Ok(());
}
Ok(manifest) => manifest,
};
if manifest.version < MIN_SUPPORTED_SNAPSHOT_MANIFEST_VERSION {
trace!(target: "sync", "{}: Snapshot manifest version too low: {}", peer_id, manifest.version);
io.disable_peer(peer_id);
self.continue_sync(io);
return Ok(());
}
self.snapshot.reset_to(&manifest, &manifest_rlp.as_raw().sha3());
io.snapshot_service().begin_restore(manifest);
self.state = SyncState::SnapshotData;

View File

@ -144,6 +144,7 @@ mod test {
let state_chunks: Vec<Bytes> = (0..20).map(|_| H256::random().to_vec()).collect();
let block_chunks: Vec<Bytes> = (0..20).map(|_| H256::random().to_vec()).collect();
let manifest = ManifestData {
version: 2,
state_hashes: state_chunks.iter().map(|data| data.sha3()).collect(),
block_hashes: block_chunks.iter().map(|data| data.sha3()).collect(),
state_root: H256::new(),

View File

@ -49,6 +49,7 @@ impl TestSnapshotService {
let state_chunks: Vec<Bytes> = (0..num_state_chunks).map(|_| H256::random().to_vec()).collect();
let block_chunks: Vec<Bytes> = (0..num_block_chunks).map(|_| H256::random().to_vec()).collect();
let manifest = ManifestData {
version: 2,
state_hashes: state_chunks.iter().map(|data| data.sha3()).collect(),
block_hashes: block_chunks.iter().map(|data| data.sha3()).collect(),
state_root: H256::new(),

View File

@ -380,10 +380,7 @@ impl JournalDB for OverlayRecentDB {
match rc {
0 => {}
1 => {
if cfg!(debug_assertions) && self.backing.get(self.column, &key)?.is_some() {
return Err(BaseDataError::AlreadyExists(key).into());
}
_ if rc > 0 => {
batch.put(self.column, &key, &value)
}
-1 => {
@ -392,7 +389,7 @@ impl JournalDB for OverlayRecentDB {
}
batch.delete(self.column, &key)
}
_ => panic!("Attempted to inject invalid state."),
_ => panic!("Attempted to inject invalid state ({})", rc),
}
}