From 76a7246369c7aea223c52d5747c62afefc518595 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Fri, 5 Aug 2016 17:00:46 +0200 Subject: [PATCH] Snapshot creation and restoration (#1679) * to_rlp takes self by-reference * clean up some derefs * out-of-order insertion for blockchain * implement block rebuilder without verification * group block chunk header into struct * block rebuilder does verification * integrate snapshot service with client service; flesh out implementation more * initial implementation of snapshot service * remove snapshottaker trait * snapshot writer trait with packed and loose implementations * write chunks using "snapshotwriter" in service * have snapshot taking use snapshotwriter * implement snapshot readers * back up client dbs when replacing * use snapshot reader in snapshot service * describe offset format * use new get_db_path in parity, allow some errors in service * blockchain formatting * implement parity snapshot * implement snapshot restore * force blocks to be submitted in order * fix bug loading block hashes in packed reader * fix seal field loading * fix uncle hash computation * fix a few bugs * store genesis state in db. reverse block chunk order in packed writer * allow out-of-order import for blocks * bring restoration types together * only snapshot the last 30000 blocks * restore into overlaydb instead of journaldb * commit version to database * use memorydbs and commit directly * fix trie test compilation * fix failing tests * sha3_null_rlp, not H256::zero * move overlaydb to ref_overlaydb, add new overlaydb without on-disk rc * port archivedb to new overlaydb * add deletion mode tests for overlaydb * use new overlaydb, check state root at end * share chain info between state and block snapshotting * create blocks snapshot using blockchain directly * allow snapshot from arbitrary block, remove panickers from snapshot creation * begin test framework * blockchain chunking test * implement stateproducer::tick * state snapshot test * create block and state chunks concurrently, better restoration informant * fix tests * add deletion mode tests for overlaydb * address comments * more tests * Fix up tests. * remove a few printlns * add a little more documentation to `commit` * fix tests * fix ref_overlaydb test names * snapshot command skeleton * revert ref_overlaydb renaming * reimplement snapshot commands * fix many errors * everything but inject * get ethcore compiling * get snapshot tests passing again * instrument snapshot commands again * fix fallout from other changes, mark snapshots as experimental * optimize injection patterns * do two injections * fix up tests * take snapshots from 1000 blocks efore * address minor comments * fix a few io crate related errors * clarify names about total difficulty [ci skip] --- Cargo.lock | 1 + ethcore/Cargo.toml | 1 + ethcore/src/block.rs | 2 +- ethcore/src/block_queue.rs | 18 +- ethcore/src/blockchain/blockchain.rs | 120 ++++++- ethcore/src/blockchain/extras.rs | 2 +- ethcore/src/blockchain/generator/mod.rs | 2 + ethcore/src/blockchain/mod.rs | 2 +- ethcore/src/client/client.rs | 39 ++- ethcore/src/client/mod.rs | 1 - ethcore/src/error.rs | 48 ++- ethcore/src/externalities.rs | 2 +- ethcore/src/json_tests/chain.rs | 2 +- ethcore/src/lib.rs | 1 + ethcore/src/service.rs | 36 +- ethcore/src/snapshot/account.rs | 44 +-- ethcore/src/snapshot/block.rs | 42 +-- ethcore/src/snapshot/error.rs | 68 ++++ ethcore/src/snapshot/io.rs | 343 +++++++++++++++++++ ethcore/src/snapshot/mod.rs | 350 +++++++++++++------ ethcore/src/snapshot/service.rs | 435 ++++++++++++++++++++++++ ethcore/src/snapshot/tests/blocks.rs | 91 +++++ ethcore/src/snapshot/tests/helpers.rs | 122 +++++++ ethcore/src/snapshot/tests/mod.rs | 22 ++ ethcore/src/snapshot/tests/state.rs | 82 +++++ ethcore/src/spec/spec.rs | 15 +- ethcore/src/state.rs | 2 +- ethcore/src/tests/client.rs | 6 +- ethcore/src/tests/helpers.rs | 6 +- ethcore/src/tests/rpc.rs | 2 +- ethcore/src/types/blockchain_info.rs | 2 +- parity/cli.rs | 4 + parity/configuration.rs | 32 ++ parity/main.rs | 3 +- parity/run.rs | 2 +- parity/snapshot.rs | 195 +++++++++++ rpc/src/v1/tests/eth.rs | 2 +- sync/src/lib.rs | 2 +- util/src/journaldb/mod.rs | 2 +- util/src/journaldb/refcounteddb.rs | 2 +- util/src/memorydb.rs | 2 +- util/src/overlaydb.rs | 2 +- 42 files changed, 1913 insertions(+), 244 deletions(-) create mode 100644 ethcore/src/snapshot/error.rs create mode 100644 ethcore/src/snapshot/io.rs create mode 100644 ethcore/src/snapshot/service.rs create mode 100644 ethcore/src/snapshot/tests/blocks.rs create mode 100644 ethcore/src/snapshot/tests/helpers.rs create mode 100644 ethcore/src/snapshot/tests/mod.rs create mode 100644 ethcore/src/snapshot/tests/state.rs create mode 100644 parity/snapshot.rs diff --git a/Cargo.lock b/Cargo.lock index 487fea845..46a2fd71c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -258,6 +258,7 @@ dependencies = [ "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", "rayon 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "rust-crypto 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/ethcore/Cargo.toml b/ethcore/Cargo.toml index b87be1087..b02538fb7 100644 --- a/ethcore/Cargo.toml +++ b/ethcore/Cargo.toml @@ -34,6 +34,7 @@ ethjson = { path = "../json" } ethcore-ipc = { path = "../ipc/rpc" } ethstore = { path = "../ethstore" } ethcore-ipc-nano = { path = "../ipc/nano" } +rand = "0.3" [dependencies.hyper] git = "https://github.com/ethcore/hyper" diff --git a/ethcore/src/block.rs b/ethcore/src/block.rs index 44fb1676f..d2477b489 100644 --- a/ethcore/src/block.rs +++ b/ethcore/src/block.rs @@ -40,7 +40,7 @@ impl Block { UntrustedRlp::new(b).as_val::().is_ok() } - /// Get the RLP-encoding of the block without the seal. + /// Get the RLP-encoding of the block with or without the seal. pub fn rlp_bytes(&self, seal: Seal) -> Bytes { let mut block_rlp = RlpStream::new_list(3); self.header.stream_rlp(&mut block_rlp, seal); diff --git a/ethcore/src/block_queue.rs b/ethcore/src/block_queue.rs index b65a4ad26..89a620493 100644 --- a/ethcore/src/block_queue.rs +++ b/ethcore/src/block_queue.rs @@ -80,7 +80,7 @@ impl BlockQueueInfo { /// Sorts them ready for blockchain insertion. pub struct BlockQueue { panic_handler: Arc, - engine: Arc>, + engine: Arc, more_to_verify: Arc, verification: Arc, verifiers: Vec>, @@ -140,7 +140,7 @@ struct Verification { impl BlockQueue { /// Creates a new queue instance. - pub fn new(config: BlockQueueConfig, engine: Arc>, message_channel: IoChannel) -> BlockQueue { + pub fn new(config: BlockQueueConfig, engine: Arc, message_channel: IoChannel) -> BlockQueue { let verification = Arc::new(Verification { unverified: Mutex::new(VecDeque::new()), verified: Mutex::new(VecDeque::new()), @@ -196,7 +196,7 @@ impl BlockQueue { } } - fn verify(verification: Arc, engine: Arc>, wait: Arc, ready: Arc, deleting: Arc, empty: Arc) { + fn verify(verification: Arc, engine: Arc, wait: Arc, ready: Arc, deleting: Arc, empty: Arc) { while !deleting.load(AtomicOrdering::Acquire) { { let mut more_to_verify = verification.more_to_verify.lock().unwrap(); @@ -226,7 +226,7 @@ impl BlockQueue { }; let block_hash = block.header.hash(); - match verify_block_unordered(block.header, block.bytes, &**engine) { + match verify_block_unordered(block.header, block.bytes, &*engine) { Ok(verified) => { let mut verifying = verification.verifying.lock(); for e in verifying.iter_mut() { @@ -319,7 +319,7 @@ impl BlockQueue { } } - match verify_block_basic(&header, &bytes, &**self.engine) { + match verify_block_basic(&header, &bytes, &*self.engine) { Ok(()) => { self.processing.write().insert(h.clone()); self.verification.unverified.lock().push_back(UnverifiedBlock { header: header, bytes: bytes }); @@ -340,7 +340,7 @@ impl BlockQueue { return; } let mut verified_lock = self.verification.verified.lock(); - let mut verified = verified_lock.deref_mut(); + let mut verified = &mut *verified_lock; let mut bad = self.verification.bad.lock(); let mut processing = self.processing.write(); bad.reserve(block_hashes.len()); @@ -460,7 +460,7 @@ mod tests { fn get_test_queue() -> BlockQueue { let spec = get_test_spec(); let engine = spec.engine; - BlockQueue::new(BlockQueueConfig::default(), Arc::new(engine), IoChannel::disconnected()) + BlockQueue::new(BlockQueueConfig::default(), engine, IoChannel::disconnected()) } #[test] @@ -468,7 +468,7 @@ mod tests { // TODO better test let spec = Spec::new_test(); let engine = spec.engine; - let _ = BlockQueue::new(BlockQueueConfig::default(), Arc::new(engine), IoChannel::disconnected()); + let _ = BlockQueue::new(BlockQueueConfig::default(), engine, IoChannel::disconnected()); } #[test] @@ -531,7 +531,7 @@ mod tests { let engine = spec.engine; let mut config = BlockQueueConfig::default(); config.max_mem_use = super::MIN_MEM_LIMIT; // empty queue uses about 15000 - let queue = BlockQueue::new(config, Arc::new(engine), IoChannel::disconnected()); + let queue = BlockQueue::new(config, engine, IoChannel::disconnected()); assert!(!queue.queue_info().is_full()); let mut blocks = get_good_dummy_block_seq(50); for b in blocks.drain(..) { diff --git a/ethcore/src/blockchain/blockchain.rs b/ethcore/src/blockchain/blockchain.rs index 586fcb575..3fa728686 100644 --- a/ethcore/src/blockchain/blockchain.rs +++ b/ethcore/src/blockchain/blockchain.rs @@ -533,6 +533,116 @@ impl BlockChain { } } + /// Inserts a verified, known block from the canonical chain. + /// + /// Can be performed out-of-order, but care must be taken that the final chain is in a correct state. + /// This is used by snapshot restoration. + /// + /// Supply a dummy parent total difficulty when the parent block may not be in the chain. + /// Returns true if the block is disconnected. + pub fn insert_snapshot_block(&self, bytes: &[u8], receipts: Vec, parent_td: Option, is_best: bool) -> bool { + let block = BlockView::new(bytes); + let header = block.header_view(); + let hash = header.sha3(); + + if self.is_known(&hash) { + return false; + } + + assert!(self.pending_best_block.read().is_none()); + + let batch = self.db.transaction(); + + let block_rlp = UntrustedRlp::new(bytes); + let compressed_header = block_rlp.at(0).unwrap().compress(RlpType::Blocks); + let compressed_body = UntrustedRlp::new(&Self::block_to_body(bytes)).compress(RlpType::Blocks); + + // store block in db + batch.put(DB_COL_HEADERS, &hash, &compressed_header).unwrap(); + batch.put(DB_COL_BODIES, &hash, &compressed_body).unwrap(); + + let maybe_parent = self.block_details(&header.parent_hash()); + + if let Some(parent_details) = maybe_parent { + // parent known to be in chain. + let info = BlockInfo { + hash: hash, + number: header.number(), + total_difficulty: parent_details.total_difficulty + header.difficulty(), + location: BlockLocation::CanonChain, + }; + + self.prepare_update(&batch, ExtrasUpdate { + block_hashes: self.prepare_block_hashes_update(bytes, &info), + block_details: self.prepare_block_details_update(bytes, &info), + block_receipts: self.prepare_block_receipts_update(receipts, &info), + transactions_addresses: self.prepare_transaction_addresses_update(bytes, &info), + blocks_blooms: self.prepare_block_blooms_update(bytes, &info), + info: info, + block: bytes + }, is_best); + self.db.write(batch).unwrap(); + + false + } else { + // parent not in the chain yet. we need the parent difficulty to proceed. + let d = parent_td + .expect("parent total difficulty always supplied for first block in chunk. only first block can have missing parent; qed"); + + let info = BlockInfo { + hash: hash, + number: header.number(), + total_difficulty: d + header.difficulty(), + location: BlockLocation::CanonChain, + }; + + let block_details = BlockDetails { + number: header.number(), + total_difficulty: info.total_difficulty, + parent: header.parent_hash(), + children: Vec::new(), + }; + + let mut update = HashMap::new(); + update.insert(hash, block_details); + + self.prepare_update(&batch, ExtrasUpdate { + block_hashes: self.prepare_block_hashes_update(bytes, &info), + block_details: update, + block_receipts: self.prepare_block_receipts_update(receipts, &info), + transactions_addresses: self.prepare_transaction_addresses_update(bytes, &info), + blocks_blooms: self.prepare_block_blooms_update(bytes, &info), + info: info, + block: bytes, + }, is_best); + self.db.write(batch).unwrap(); + + true + } + } + + /// Add a child to a given block. Assumes that the block hash is in + /// the chain and the child's parent is this block. + /// + /// Used in snapshots to glue the chunks together at the end. + pub fn add_child(&self, block_hash: H256, child_hash: H256) { + let mut parent_details = self.block_details(&block_hash) + .unwrap_or_else(|| panic!("Invalid block hash: {:?}", block_hash)); + + let batch = self.db.transaction(); + parent_details.children.push(child_hash); + + let mut update = HashMap::new(); + update.insert(block_hash, parent_details); + + self.note_used(CacheID::BlockDetails(block_hash)); + + let mut write_details = self.block_details.write(); + batch.extend_with_cache(DB_COL_EXTRA, &mut *write_details, update, CacheUpdatePolicy::Overwrite); + + self.db.write(batch).unwrap(); + } + #[cfg_attr(feature="dev", allow(similar_names))] /// Inserts the block into backing cache database. /// Expects the block to be valid and already verified. @@ -572,7 +682,7 @@ impl BlockChain { blocks_blooms: self.prepare_block_blooms_update(bytes, &info), info: info.clone(), block: bytes, - }); + }, true); ImportRoute::from(info) } @@ -618,7 +728,7 @@ impl BlockChain { } /// Prepares extras update. - fn prepare_update(&self, batch: &DBTransaction, update: ExtrasUpdate) { + fn prepare_update(&self, batch: &DBTransaction, update: ExtrasUpdate, is_best: bool) { { for hash in update.block_details.keys().cloned() { self.note_used(CacheID::BlockDetails(hash)); @@ -645,7 +755,7 @@ impl BlockChain { // update best block match update.info.location { BlockLocation::Branch => (), - _ => { + _ => if is_best { batch.put(DB_COL_EXTRA, b"best", &update.info.hash).unwrap(); *best_block = Some(BestBlock { hash: update.info.hash, @@ -653,9 +763,8 @@ impl BlockChain { total_difficulty: update.info.total_difficulty, block: update.block.to_vec(), }); - } + }, } - let mut write_hashes = self.pending_block_hashes.write(); let mut write_txs = self.pending_transaction_addresses.write(); @@ -745,6 +854,7 @@ impl BlockChain { } /// This function returns modified block details. + /// Uses the given parent details or attempts to load them from the database. fn prepare_block_details_update(&self, block_bytes: &[u8], info: &BlockInfo) -> HashMap { let block = BlockView::new(block_bytes); let header = block.header_view(); diff --git a/ethcore/src/blockchain/extras.rs b/ethcore/src/blockchain/extras.rs index 619706c00..6bb10276c 100644 --- a/ethcore/src/blockchain/extras.rs +++ b/ethcore/src/blockchain/extras.rs @@ -41,7 +41,7 @@ pub enum ExtrasIndex { fn with_index(hash: &H256, i: ExtrasIndex) -> H264 { let mut result = H264::default(); result[0] = i as u8; - result.deref_mut()[1..].clone_from_slice(hash); + (*result)[1..].clone_from_slice(hash); result } diff --git a/ethcore/src/blockchain/generator/mod.rs b/ethcore/src/blockchain/generator/mod.rs index 88fdec0e4..b02030d4e 100644 --- a/ethcore/src/blockchain/generator/mod.rs +++ b/ethcore/src/blockchain/generator/mod.rs @@ -14,6 +14,8 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . +//! Blockchain generator for tests. + mod bloom; mod block; mod complete; diff --git a/ethcore/src/blockchain/mod.rs b/ethcore/src/blockchain/mod.rs index 13b7c61eb..7b9ae0e60 100644 --- a/ethcore/src/blockchain/mod.rs +++ b/ethcore/src/blockchain/mod.rs @@ -26,7 +26,7 @@ mod import_route; mod update; #[cfg(test)] -mod generator; +pub mod generator; pub use self::blockchain::{BlockProvider, BlockChain}; pub use self::cache::CacheSize; diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 1aeca3974..67937ff45 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -13,7 +13,6 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . - use std::collections::{HashSet, HashMap, VecDeque}; use std::sync::{Arc, Weak}; use std::path::{Path}; @@ -62,6 +61,7 @@ use trace::FlatTransactionTraces; use evm::Factory as EvmFactory; use miner::{Miner, MinerService}; use util::TrieFactory; +use snapshot::{self, io as snapshot_io}; // re-export pub use types::blockchain_info::BlockChainInfo; @@ -119,7 +119,7 @@ pub struct Client { mode: Mode, chain: Arc, tracedb: Arc>, - engine: Arc>, + engine: Arc, db: Arc, state_db: Mutex>, block_queue: BlockQueue, @@ -139,6 +139,7 @@ pub struct Client { } const HISTORY: u64 = 1200; + // database columns /// Column for State pub const DB_COL_STATE: Option = Some(0); @@ -161,10 +162,10 @@ pub fn append_path

(path: P, item: &str) -> String where P: AsRef { } impl Client { - /// Create a new client with given spec and DB path and custom verifier. + /// Create a new client with given spec and DB path and custom verifier. pub fn new( config: ClientConfig, - spec: Spec, + spec: &Spec, path: &Path, miner: Arc, message_channel: IoChannel, @@ -191,8 +192,7 @@ impl Client { warn!("State root not found for block #{} ({})", chain.best_block_number(), chain.best_block_hash().hex()); } - let engine = Arc::new(spec.engine); - + let engine = spec.engine.clone(); let block_queue = BlockQueue::new(config.queue, engine.clone(), message_channel.clone()); let panic_handler = PanicHandler::new_in_arc(); panic_handler.forward_from(&block_queue); @@ -270,7 +270,7 @@ impl Client { } fn check_and_close_block(&self, block: &PreverifiedBlock) -> Result { - let engine = &**self.engine; + let engine = &*self.engine; let header = &block.header; // Check the block isn't so old we won't be able to enact it. @@ -593,6 +593,23 @@ impl Client { } } + /// Take a snapshot. + pub fn take_snapshot(&self, writer: W) -> Result<(), ::error::Error> { + let db = self.state_db.lock().boxed_clone(); + let best_block_number = self.chain_info().best_block_number; + let start_block_number = if best_block_number > 1000 { + best_block_number - 1000 + } else { + 0 + }; + let start_hash = self.block_hash(BlockID::Number(start_block_number)) + .expect("blocks within HISTORY are always stored."); + + try!(snapshot::take_snapshot(&self.chain, start_hash, db.as_hashdb(), writer)); + + Ok(()) + } + fn block_hash(chain: &BlockChain, id: BlockID) -> Option { match id { BlockID::Hash(hash) => Some(hash), @@ -665,7 +682,7 @@ impl BlockChainClient for Client { state.add_balance(&sender, &(needed_balance - balance)); } let options = TransactOptions { tracing: analytics.transaction_tracing, vm_tracing: analytics.vm_tracing, check_nonce: false }; - let mut ret = try!(Executive::new(&mut state, &env_info, &**self.engine, &self.vm_factory).transact(t, options)); + let mut ret = try!(Executive::new(&mut state, &env_info, &*self.engine, &self.vm_factory).transact(t, options)); // TODO gav move this into Executive. ret.state_diff = original_state.map(|original| state.diff_from(original)); @@ -697,7 +714,7 @@ impl BlockChainClient for Client { gas_limit: view.gas_limit(), }; for t in txs.iter().take(address.index) { - match Executive::new(&mut state, &env_info, &**self.engine, &self.vm_factory).transact(t, Default::default()) { + match Executive::new(&mut state, &env_info, &*self.engine, &self.vm_factory).transact(t, Default::default()) { Ok(x) => { env_info.gas_used = env_info.gas_used + x.gas_used; } Err(ee) => { return Err(CallError::Execution(ee)) } } @@ -705,7 +722,7 @@ impl BlockChainClient for Client { let t = &txs[address.index]; let original_state = if analytics.state_diffing { Some(state.clone()) } else { None }; - let mut ret = try!(Executive::new(&mut state, &env_info, &**self.engine, &self.vm_factory).transact(t, options)); + let mut ret = try!(Executive::new(&mut state, &env_info, &*self.engine, &self.vm_factory).transact(t, options)); ret.state_diff = original_state.map(|original| state.diff_from(original)); Ok(ret) @@ -997,7 +1014,7 @@ impl BlockChainClient for Client { impl MiningBlockChainClient for Client { fn prepare_open_block(&self, author: Address, gas_range_target: (U256, U256), extra_data: Bytes) -> OpenBlock { - let engine = &**self.engine; + let engine = &*self.engine; let h = self.chain.best_block_hash(); let mut open_block = OpenBlock::new( diff --git a/ethcore/src/client/mod.rs b/ethcore/src/client/mod.rs index 710fb5768..32582ddf2 100644 --- a/ethcore/src/client/mod.rs +++ b/ethcore/src/client/mod.rs @@ -45,7 +45,6 @@ mod traits { pub mod chain_notify { //! Chain notify interface - #![allow(dead_code, unused_assignments, unused_variables, missing_docs)] // codegen issues include!(concat!(env!("OUT_DIR"), "/chain_notify.rs")); } diff --git a/ethcore/src/error.rs b/ethcore/src/error.rs index cdb3deb38..449303732 100644 --- a/ethcore/src/error.rs +++ b/ethcore/src/error.rs @@ -23,6 +23,8 @@ use basic_types::LogBloom; use client::Error as ClientError; use ipc::binary::{BinaryConvertError, BinaryConvertable}; use types::block_import_error::BlockImportError; +use snapshot::Error as SnapshotError; + pub use types::executed::{ExecutionError, CallError}; #[derive(Debug, PartialEq, Clone)] @@ -234,25 +236,28 @@ pub enum Error { StdIo(::std::io::Error), /// Snappy error. Snappy(::util::snappy::InvalidInput), + /// Snapshot error. + Snapshot(SnapshotError), } impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self { - Error::Client(ref err) => f.write_fmt(format_args!("{}", err)), - Error::Util(ref err) => f.write_fmt(format_args!("{}", err)), - Error::Io(ref err) => f.write_fmt(format_args!("{}", err)), - Error::Block(ref err) => f.write_fmt(format_args!("{}", err)), - Error::Execution(ref err) => f.write_fmt(format_args!("{}", err)), - Error::Transaction(ref err) => f.write_fmt(format_args!("{}", err)), - Error::Import(ref err) => f.write_fmt(format_args!("{}", err)), + Error::Client(ref err) => err.fmt(f), + Error::Util(ref err) => err.fmt(f), + Error::Io(ref err) => err.fmt(f), + Error::Block(ref err) => err.fmt(f), + Error::Execution(ref err) => err.fmt(f), + Error::Transaction(ref err) => err.fmt(f), + Error::Import(ref err) => err.fmt(f), Error::UnknownEngineName(ref name) => f.write_fmt(format_args!("Unknown engine name ({})", name)), Error::PowHashInvalid => f.write_str("Invalid or out of date PoW hash."), Error::PowInvalid => f.write_str("Invalid nonce or mishash"), - Error::Trie(ref err) => f.write_fmt(format_args!("{}", err)), - Error::StdIo(ref err) => f.write_fmt(format_args!("{}", err)), - Error::Snappy(ref err) => f.write_fmt(format_args!("{}", err)), + Error::Trie(ref err) => err.fmt(f), + Error::StdIo(ref err) => err.fmt(f), + Error::Snappy(ref err) => err.fmt(f), + Error::Snapshot(ref err) => err.fmt(f), } } } @@ -329,12 +334,6 @@ impl From<::std::io::Error> for Error { } } -impl From<::util::snappy::InvalidInput> for Error { - fn from(err: ::util::snappy::InvalidInput) -> Error { - Error::Snappy(err) - } -} - impl From for Error { fn from(err: BlockImportError) -> Error { match err { @@ -345,6 +344,23 @@ impl From for Error { } } +impl From for Error { + fn from(err: snappy::InvalidInput) -> Error { + Error::Snappy(err) + } +} + +impl From for Error { + fn from(err: SnapshotError) -> Error { + match err { + SnapshotError::Io(err) => Error::StdIo(err), + SnapshotError::Trie(err) => Error::Trie(err), + SnapshotError::Decoder(err) => err.into(), + other => Error::Snapshot(other), + } + } +} + impl From> for Error where Error: From { fn from(err: Box) -> Error { Error::from(*err) diff --git a/ethcore/src/externalities.rs b/ethcore/src/externalities.rs index 2c7ecb4e5..bfaa15d38 100644 --- a/ethcore/src/externalities.rs +++ b/ethcore/src/externalities.rs @@ -332,7 +332,7 @@ mod tests { struct TestSetup { state: GuardedTempResult, - engine: Box, + engine: Arc, sub_state: Substate, env_info: EnvInfo } diff --git a/ethcore/src/json_tests/chain.rs b/ethcore/src/json_tests/chain.rs index 58af928c4..37a70e48e 100644 --- a/ethcore/src/json_tests/chain.rs +++ b/ethcore/src/json_tests/chain.rs @@ -61,7 +61,7 @@ pub fn json_chain_test(json_data: &[u8], era: ChainEra) -> Vec { { let client = Client::new( ClientConfig::default(), - spec(&blockchain), + &spec(&blockchain), temp.as_path(), Arc::new(Miner::with_spec(spec(&blockchain))), IoChannel::disconnected() diff --git a/ethcore/src/lib.rs b/ethcore/src/lib.rs index e883385f6..875abb804 100644 --- a/ethcore/src/lib.rs +++ b/ethcore/src/lib.rs @@ -96,6 +96,7 @@ pub extern crate ethstore; extern crate semver; extern crate ethcore_ipc_nano as nanoipc; extern crate ethcore_devtools as devtools; +extern crate rand; extern crate bit_set; #[cfg(feature = "jit" )] extern crate evmjit; diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index 10c2d99b5..c942416db 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -22,6 +22,7 @@ use spec::Spec; use error::*; use client::{Client, ClientConfig, ChainNotify}; use miner::Miner; +use snapshot::service::Service as SnapshotService; use std::sync::atomic::AtomicBool; #[cfg(feature="ipc")] @@ -38,12 +39,17 @@ pub enum ClientIoMessage { BlockVerified, /// New transaction RLPs are ready to be imported NewTransactions(Vec), + /// Feed a state chunk to the snapshot service + FeedStateChunk(H256, Bytes), + /// Feed a block chunk to the snapshot service + FeedBlockChunk(H256, Bytes), } /// Client service setup. Creates and registers client and network services with the IO subsystem. pub struct ClientService { io_service: Arc>, client: Arc, + snapshot: Arc, panic_handler: Arc, _stop_guard: ::devtools::StopGuard, } @@ -65,10 +71,17 @@ impl ClientService { if spec.fork_name.is_some() { warn!("Your chain is an alternative fork. {}", Colour::Red.bold().paint("TRANSACTIONS MAY BE REPLAYED ON THE MAINNET!")); } - let client = try!(Client::new(config, spec, db_path, miner, io_service.channel())); - panic_handler.forward_from(client.deref()); + + let pruning = config.pruning; + let client = try!(Client::new(config, &spec, db_path, miner, io_service.channel())); + let snapshot = try!(SnapshotService::new(spec, pruning, db_path.into(), io_service.channel())); + + let snapshot = Arc::new(snapshot); + + panic_handler.forward_from(&*client); let client_io = Arc::new(ClientIoHandler { - client: client.clone() + client: client.clone(), + snapshot: snapshot.clone(), }); try!(io_service.register_handler(client_io)); @@ -78,6 +91,7 @@ impl ClientService { Ok(ClientService { io_service: Arc::new(io_service), client: client, + snapshot: snapshot, panic_handler: panic_handler, _stop_guard: stop_guard, }) @@ -98,6 +112,11 @@ impl ClientService { self.client.clone() } + /// Get snapshot interface. + pub fn snapshot_service(&self) -> Arc { + self.snapshot.clone() + } + /// Get network service component pub fn io(&self) -> Arc> { self.io_service.clone() @@ -117,7 +136,8 @@ impl MayPanic for ClientService { /// IO interface for the Client handler struct ClientIoHandler { - client: Arc + client: Arc, + snapshot: Arc, } const CLIENT_TICK_TIMER: TimerToken = 0; @@ -139,6 +159,8 @@ impl IoHandler for ClientIoHandler { match *net_message { ClientIoMessage::BlockVerified => { self.client.import_verified_blocks(); } ClientIoMessage::NewTransactions(ref transactions) => { self.client.import_queued_transactions(transactions); } + ClientIoMessage::FeedStateChunk(ref hash, ref chunk) => self.snapshot.feed_state_chunk(*hash, chunk), + ClientIoMessage::FeedBlockChunk(ref hash, ref chunk) => self.snapshot.feed_block_chunk(*hash, chunk), _ => {} // ignore other messages } } @@ -172,10 +194,14 @@ mod tests { #[test] fn it_can_be_started() { let temp_path = RandomTempPath::new(); + let mut path = temp_path.as_path().to_owned(); + path.push("pruning"); + path.push("db"); + let service = ClientService::start( ClientConfig::default(), get_test_spec(), - temp_path.as_path(), + &path, Arc::new(Miner::with_spec(get_test_spec())), ); assert!(service.is_ok()); diff --git a/ethcore/src/snapshot/account.rs b/ethcore/src/snapshot/account.rs index ec9566470..a74856b60 100644 --- a/ethcore/src/snapshot/account.rs +++ b/ethcore/src/snapshot/account.rs @@ -17,7 +17,7 @@ //! Account state encoding and decoding use account_db::{AccountDB, AccountDBMut}; -use error::Error; +use snapshot::Error; use util::{Bytes, HashDB, SHA3_EMPTY, TrieDB}; use util::hash::{FixedHash, H256}; @@ -133,39 +133,25 @@ impl Account { code_hash: code_hash, }) } + + #[cfg(test)] + pub fn storage_root_mut(&mut self) -> &mut H256 { + &mut self.storage_root + } } #[cfg(test)] mod tests { use account_db::{AccountDB, AccountDBMut}; use tests::helpers::get_temp_journal_db; + use snapshot::tests::helpers::fill_storage; use util::{SHA3_NULL_RLP, SHA3_EMPTY}; use util::hash::{Address, FixedHash, H256}; use util::rlp::{UntrustedRlp, View}; - use util::trie::{Alphabet, StandardMap, SecTrieDBMut, TrieMut, ValueMode}; use super::Account; - fn fill_storage(mut db: AccountDBMut) -> H256 { - let map = StandardMap { - alphabet: Alphabet::All, - min_key: 6, - journal_key: 6, - value_mode: ValueMode::Random, - count: 100 - }; - - let mut root = H256::new(); - { - let mut trie = SecTrieDBMut::new(&mut db, &mut root); - for (k, v) in map.make() { - trie.insert(&k, &v).unwrap(); - } - } - root - } - #[test] fn encoding_basic() { let mut db = get_temp_journal_db(); @@ -193,12 +179,16 @@ mod tests { let mut db = &mut **db; let addr = Address::random(); - let root = fill_storage(AccountDBMut::new(db.as_hashdb_mut(), &addr)); - let account = Account { - nonce: 25.into(), - balance: 987654321.into(), - storage_root: root, - code_hash: SHA3_EMPTY, + let account = { + let acct_db = AccountDBMut::new(db.as_hashdb_mut(), &addr); + let mut root = SHA3_NULL_RLP; + fill_storage(acct_db, &mut root, &mut H256::zero()); + Account { + nonce: 25.into(), + balance: 987654321.into(), + storage_root: root, + code_hash: SHA3_EMPTY, + } }; let thin_rlp = account.to_thin_rlp(); diff --git a/ethcore/src/snapshot/block.rs b/ethcore/src/snapshot/block.rs index fd034d97b..394db345e 100644 --- a/ethcore/src/snapshot/block.rs +++ b/ethcore/src/snapshot/block.rs @@ -16,9 +16,6 @@ //! Block RLP compression. -// TODO [rob] remove when BlockRebuilder done. -#![allow(dead_code)] - use block::Block; use header::Header; @@ -50,10 +47,9 @@ impl AbridgedBlock { /// producing new rlp. pub fn from_block_view(block_view: &BlockView) -> Self { let header = block_view.header_view(); - let seal_fields = header.seal(); - // 10 header fields, unknown amount of seal fields, and 2 block fields. + // 10 header fields, unknown number of seal fields, and 2 block fields. let mut stream = RlpStream::new_list( HEADER_FIELDS + seal_fields.len() + @@ -110,25 +106,17 @@ impl AbridgedBlock { let transactions = try!(rlp.val_at(10)); let uncles: Vec

= try!(rlp.val_at(11)); - // iterator-based approach is cleaner but doesn't work w/ try. - let seal = { - let mut seal = Vec::new(); + let mut uncles_rlp = RlpStream::new(); + uncles_rlp.append(&uncles); + header.uncles_hash = uncles_rlp.as_raw().sha3(); - for i in 12..rlp.item_count() { - seal.push(try!(rlp.val_at(i))); - } + let mut seal_fields = Vec::new(); + for i in (HEADER_FIELDS + BLOCK_FIELDS)..rlp.item_count() { + let seal_rlp = try!(rlp.at(i)); + seal_fields.push(seal_rlp.as_raw().to_owned()); + } - seal - }; - - header.set_seal(seal); - - let uncle_bytes = uncles.iter() - .fold(RlpStream::new_list(uncles.len()), |mut s, u| { - s.append_raw(&u.rlp(::basic_types::Seal::With), 1); - s - }).out(); - header.uncles_hash = uncle_bytes.sha3(); + header.set_seal(seal_fields); Ok(Block { header: header, @@ -147,16 +135,10 @@ mod tests { use util::numbers::U256; use util::hash::{Address, H256, FixedHash}; - use util::{Bytes, RlpStream, Stream}; + use util::Bytes; fn encode_block(b: &Block) -> Bytes { - let mut s = RlpStream::new_list(3); - - b.header.stream_rlp(&mut s, ::basic_types::Seal::With); - s.append(&b.transactions); - s.append(&b.uncles); - - s.out() + b.rlp_bytes(::basic_types::Seal::With) } #[test] diff --git a/ethcore/src/snapshot/error.rs b/ethcore/src/snapshot/error.rs new file mode 100644 index 000000000..98f906ec5 --- /dev/null +++ b/ethcore/src/snapshot/error.rs @@ -0,0 +1,68 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Snapshot-related errors. + +use std::fmt; + +use util::H256; +use util::trie::TrieError; +use util::rlp::DecoderError; + +/// Snapshot-related errors. +#[derive(Debug)] +pub enum Error { + /// Invalid starting block for snapshot. + InvalidStartingBlock(H256), + /// Block not found. + BlockNotFound(H256), + /// Trie error. + Trie(TrieError), + /// Decoder error. + Decoder(DecoderError), + /// Io error. + Io(::std::io::Error), +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + Error::InvalidStartingBlock(ref hash) => write!(f, "Invalid starting block hash: {}", hash), + Error::BlockNotFound(ref hash) => write!(f, "Block not found in chain: {}", hash), + Error::Io(ref err) => err.fmt(f), + Error::Decoder(ref err) => err.fmt(f), + Error::Trie(ref err) => err.fmt(f), + } + } +} + +impl From<::std::io::Error> for Error { + fn from(err: ::std::io::Error) -> Self { + Error::Io(err) + } +} + +impl From> for Error { + fn from(err: Box) -> Self { + Error::Trie(*err) + } +} + +impl From for Error { + fn from(err: DecoderError) -> Self { + Error::Decoder(err) + } +} \ No newline at end of file diff --git a/ethcore/src/snapshot/io.rs b/ethcore/src/snapshot/io.rs new file mode 100644 index 000000000..7179b97ef --- /dev/null +++ b/ethcore/src/snapshot/io.rs @@ -0,0 +1,343 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Snapshot i/o. +//! Ways of writing and reading snapshots. This module supports writing and reading +//! snapshots of two different formats: packed and loose. +//! Packed snapshots are written to a single file, and loose snapshots are +//! written to multiple files in one directory. + +use std::collections::HashMap; +use std::io::{self, Read, Seek, SeekFrom, Write}; +use std::fs::{self, File}; +use std::path::{Path, PathBuf}; + +use util::Bytes; +use util::hash::H256; +use util::rlp::{self, Encodable, RlpStream, UntrustedRlp, Stream, View}; + +use super::ManifestData; + +/// Something which can write snapshots. +/// Writing the same chunk multiple times will lead to implementation-defined +/// behavior, and is not advised. +pub trait SnapshotWriter { + /// Write a compressed state chunk. + fn write_state_chunk(&mut self, hash: H256, chunk: &[u8]) -> io::Result<()>; + + /// Write a compressed block chunk. + fn write_block_chunk(&mut self, hash: H256, chunk: &[u8]) -> io::Result<()>; + + /// Complete writing. The manifest's chunk lists must be consistent + /// with the chunks written. + fn finish(self, manifest: ManifestData) -> io::Result<()> where Self: Sized; +} + +// (hash, len, offset) +struct ChunkInfo(H256, u64, u64); + +impl Encodable for ChunkInfo { + fn rlp_append(&self, s: &mut RlpStream) { + s.begin_list(3); + s.append(&self.0).append(&self.1).append(&self.2); + } +} + +impl rlp::Decodable for ChunkInfo { + fn decode(decoder: &D) -> Result { + let d = decoder.as_rlp(); + + let hash = try!(d.val_at(0)); + let len = try!(d.val_at(1)); + let off = try!(d.val_at(2)); + Ok(ChunkInfo(hash, len, off)) + } +} + +/// A packed snapshot writer. This writes snapshots to a single concatenated file. +/// +/// The file format is very simple and consists of three parts: +/// [Concatenated chunk data] +/// [manifest as RLP] +/// [manifest start offset (8 bytes little-endian)] +/// +/// The manifest contains all the same information as a standard `ManifestData`, +/// but also maps chunk hashes to their lengths and offsets in the file +/// for easy reading. +pub struct PackedWriter { + file: File, + state_hashes: Vec, + block_hashes: Vec, + cur_len: u64, +} + +impl PackedWriter { + /// Create a new "PackedWriter", to write into the file at the given path. + pub fn new(path: &Path) -> io::Result { + Ok(PackedWriter { + file: try!(File::create(path)), + state_hashes: Vec::new(), + block_hashes: Vec::new(), + cur_len: 0, + }) + } +} + +impl SnapshotWriter for PackedWriter { + fn write_state_chunk(&mut self, hash: H256, chunk: &[u8]) -> io::Result<()> { + try!(self.file.write_all(chunk)); + + let len = chunk.len() as u64; + self.state_hashes.push(ChunkInfo(hash, len, self.cur_len)); + + self.cur_len += len; + Ok(()) + } + + fn write_block_chunk(&mut self, hash: H256, chunk: &[u8]) -> io::Result<()> { + try!(self.file.write_all(chunk)); + + let len = chunk.len() as u64; + self.block_hashes.push(ChunkInfo(hash, len, self.cur_len)); + + self.cur_len += len; + Ok(()) + } + + fn finish(mut self, manifest: ManifestData) -> io::Result<()> { + // we ignore the hashes fields of the manifest under the assumption that + // they are consistent with ours. + let mut stream = RlpStream::new_list(5); + stream + .append(&self.state_hashes) + .append(&self.block_hashes) + .append(&manifest.state_root) + .append(&manifest.block_number) + .append(&manifest.block_hash); + + let manifest_rlp = stream.out(); + + try!(self.file.write_all(&manifest_rlp)); + let off = self.cur_len; + trace!(target: "snapshot_io", "writing manifest of len {} to offset {}", manifest_rlp.len(), off); + + let off_bytes: [u8; 8] = + [ + off as u8, + (off >> 8) as u8, + (off >> 16) as u8, + (off >> 24) as u8, + (off >> 32) as u8, + (off >> 40) as u8, + (off >> 48) as u8, + (off >> 56) as u8, + ]; + + try!(self.file.write_all(&off_bytes[..])); + + Ok(()) + } +} + +/// A "loose" writer writes chunk files into a directory. +pub struct LooseWriter { + dir: PathBuf, +} + +impl LooseWriter { + /// Create a new LooseWriter which will write into the given directory, + /// creating it if it doesn't exist. + pub fn new(path: PathBuf) -> io::Result { + try!(fs::create_dir_all(&path)); + + Ok(LooseWriter { + dir: path, + }) + } + + // writing logic is the same for both kinds of chunks. + fn write_chunk(&mut self, hash: H256, chunk: &[u8]) -> io::Result<()> { + let mut file_path = self.dir.clone(); + file_path.push(hash.hex()); + + let mut file = try!(File::create(file_path)); + try!(file.write_all(chunk)); + + Ok(()) + } +} + +impl SnapshotWriter for LooseWriter { + fn write_state_chunk(&mut self, hash: H256, chunk: &[u8]) -> io::Result<()> { + self.write_chunk(hash, chunk) + } + + fn write_block_chunk(&mut self, hash: H256, chunk: &[u8]) -> io::Result<()> { + self.write_chunk(hash, chunk) + } + + fn finish(self, manifest: ManifestData) -> io::Result<()> { + let rlp = manifest.into_rlp(); + let mut path = self.dir.clone(); + path.push("MANIFEST"); + + let mut file = try!(File::create(path)); + try!(file.write_all(&rlp[..])); + + Ok(()) + } +} + +/// Something which can read compressed snapshots. +pub trait SnapshotReader { + /// Get the manifest data for this snapshot. + fn manifest(&self) -> &ManifestData; + + /// Get raw chunk data by hash. implementation defined behavior + /// if a chunk not in the manifest is requested. + fn chunk(&self, hash: H256) -> io::Result; +} + +/// Packed snapshot reader. +pub struct PackedReader { + file: File, + state_hashes: HashMap, // len, offset + block_hashes: HashMap, // len, offset + manifest: ManifestData, +} + +impl PackedReader { + /// Create a new `PackedReader` for the file at the given path. + /// This will fail if any io errors are encountered or the file + /// is not a valid packed snapshot. + pub fn new(path: &Path) -> Result, ::error::Error> { + let mut file = try!(File::open(path)); + let file_len = try!(file.metadata()).len(); + if file_len < 8 { + // ensure we don't seek before beginning. + return Ok(None); + } + + + try!(file.seek(SeekFrom::End(-8))); + let mut off_bytes = [0u8; 8]; + + try!(file.read_exact(&mut off_bytes[..])); + + let manifest_off: u64 = + ((off_bytes[7] as u64) << 56) + + ((off_bytes[6] as u64) << 48) + + ((off_bytes[5] as u64) << 40) + + ((off_bytes[4] as u64) << 32) + + ((off_bytes[3] as u64) << 24) + + ((off_bytes[2] as u64) << 16) + + ((off_bytes[1] as u64) << 8) + + (off_bytes[0] as u64); + + let manifest_len = file_len - manifest_off - 8; + trace!(target: "snapshot", "loading manifest of length {} from offset {}", manifest_len, manifest_off); + + let mut manifest_buf = vec![0; manifest_len as usize]; + + try!(file.seek(SeekFrom::Start(manifest_off))); + try!(file.read_exact(&mut manifest_buf)); + + let rlp = UntrustedRlp::new(&manifest_buf); + + let state: Vec = try!(rlp.val_at(0)); + let blocks: Vec = try!(rlp.val_at(1)); + + let manifest = ManifestData { + state_hashes: state.iter().map(|c| c.0).collect(), + block_hashes: blocks.iter().map(|c| c.0).collect(), + state_root: try!(rlp.val_at(2)), + block_number: try!(rlp.val_at(3)), + block_hash: try!(rlp.val_at(4)), + }; + + Ok(Some(PackedReader { + file: file, + state_hashes: state.into_iter().map(|c| (c.0, (c.1, c.2))).collect(), + block_hashes: blocks.into_iter().map(|c| (c.0, (c.1, c.2))).collect(), + manifest: manifest + })) + } +} + +impl SnapshotReader for PackedReader { + fn manifest(&self) -> &ManifestData { + &self.manifest + } + + fn chunk(&self, hash: H256) -> io::Result { + let &(len, off) = self.state_hashes.get(&hash).or_else(|| self.block_hashes.get(&hash)) + .expect("only chunks in the manifest can be requested; qed"); + + let mut file = &self.file; + + try!(file.seek(SeekFrom::Start(off))); + let mut buf = vec![0; len as usize]; + + try!(file.read_exact(&mut buf[..])); + + Ok(buf) + } +} + +/// reader for "loose" snapshots +pub struct LooseReader { + dir: PathBuf, + manifest: ManifestData, +} + +impl LooseReader { + /// Create a new `LooseReader` which will read the manifest and chunk data from + /// the given directory. + pub fn new(mut dir: PathBuf) -> Result { + let mut manifest_buf = Vec::new(); + + dir.push("MANIFEST"); + let mut manifest_file = try!(File::open(&dir)); + try!(manifest_file.read_to_end(&mut manifest_buf)); + + let manifest = try!(ManifestData::from_rlp(&manifest_buf[..])); + + dir.pop(); + + Ok(LooseReader { + dir: dir, + manifest: manifest, + }) + } +} + +impl SnapshotReader for LooseReader { + fn manifest(&self) -> &ManifestData { + &self.manifest + } + + fn chunk(&self, hash: H256) -> io::Result { + let mut path = self.dir.clone(); + path.push(hash.hex()); + + let mut buf = Vec::new(); + let mut file = try!(File::open(&path)); + + try!(file.read_to_end(&mut buf)); + + Ok(buf) + } +} \ No newline at end of file diff --git a/ethcore/src/snapshot/mod.rs b/ethcore/src/snapshot/mod.rs index d9cfde09c..5784ed936 100644 --- a/ethcore/src/snapshot/mod.rs +++ b/ethcore/src/snapshot/mod.rs @@ -14,123 +14,120 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -//! Snapshot creation helpers. +//! Snapshot creation, restoration, and network service. use std::collections::VecDeque; -use std::fs::{create_dir_all, File}; -use std::io::Write; -use std::path::{Path, PathBuf}; +use std::sync::Arc; use account_db::{AccountDB, AccountDBMut}; -use client::BlockChainClient; -use error::Error; -use ids::BlockID; -use views::{BlockView, HeaderView}; +use blockchain::{BlockChain, BlockProvider}; +use engines::Engine; +use views::BlockView; -use util::{Bytes, Hashable, HashDB, JournalDB, snappy, TrieDB, TrieDBMut, TrieMut, DBTransaction}; -use util::error::UtilError; +use util::{Bytes, Hashable, HashDB, snappy, TrieDB, TrieDBMut, TrieMut}; +use util::Mutex; use util::hash::{FixedHash, H256}; +use util::journaldb::{self, Algorithm, JournalDB}; +use util::kvdb::Database; use util::rlp::{DecoderError, RlpStream, Stream, UntrustedRlp, View, Compressible, RlpType}; +use util::rlp::SHA3_NULL_RLP; use self::account::Account; use self::block::AbridgedBlock; +use self::io::SnapshotWriter; use crossbeam::{scope, ScopedJoinHandle}; +use rand::{Rng, OsRng}; + +pub use self::error::Error; +pub use self::service::{RestorationStatus, Service, SnapshotService}; + +pub mod io; +pub mod service; mod account; mod block; +mod error; -// Try to have chunks be around 16MB (before compression) -const PREFERRED_CHUNK_SIZE: usize = 16 * 1024 * 1024; +#[cfg(test)] +mod tests; -/// Take a snapshot using the given client and database, writing into `path`. -pub fn take_snapshot(client: &BlockChainClient, mut path: PathBuf, state_db: &HashDB) -> Result<(), Error> { - let chain_info = client.chain_info(); +// Try to have chunks be around 4MB (before compression) +const PREFERRED_CHUNK_SIZE: usize = 4 * 1024 * 1024; - let genesis_hash = chain_info.genesis_hash; - let best_header_raw = client.best_block_header(); - let best_header = HeaderView::new(&best_header_raw); - let state_root = best_header.state_root(); +// How many blocks to include in a snapshot, starting from the head of the chain. +const SNAPSHOT_BLOCKS: u64 = 30000; - trace!(target: "snapshot", "Taking snapshot starting at block {}", best_header.number()); +/// Take a snapshot using the given blockchain, starting block hash, and database, writing into the given writer. +pub fn take_snapshot(chain: &BlockChain, start_block_hash: H256, state_db: &HashDB, writer: W) -> Result<(), Error> { + let start_header = try!(chain.block_header(&start_block_hash).ok_or(Error::InvalidStartingBlock(start_block_hash))); + let state_root = start_header.state_root(); + let number = start_header.number(); - let _ = create_dir_all(&path); + info!("Taking snapshot starting at block {}", number); - let state_hashes = try!(chunk_state(state_db, &state_root, &path)); - let block_hashes = try!(chunk_blocks(client, best_header.hash(), genesis_hash, &path)); + let writer = Mutex::new(writer); + let (state_hashes, block_hashes) = try!(scope(|scope| { + let block_guard = scope.spawn(|| chunk_blocks(chain, (number, start_block_hash), &writer)); + let state_res = chunk_state(state_db, state_root, &writer); - trace!(target: "snapshot", "produced {} state chunks and {} block chunks.", state_hashes.len(), block_hashes.len()); + state_res.and_then(|state_hashes| { + block_guard.join().map(|block_hashes| (state_hashes, block_hashes)) + }) + })); + + info!("produced {} state chunks and {} block chunks.", state_hashes.len(), block_hashes.len()); let manifest_data = ManifestData { state_hashes: state_hashes, block_hashes: block_hashes, - state_root: state_root, - block_number: chain_info.best_block_number, - block_hash: chain_info.best_block_hash, + state_root: *state_root, + block_number: number, + block_hash: start_block_hash, }; - path.push("MANIFEST"); - - let mut manifest_file = try!(File::create(&path)); - - try!(manifest_file.write_all(&manifest_data.into_rlp())); + try!(writer.into_inner().finish(manifest_data)); Ok(()) } -// shared portion of write_chunk -// returns either a (hash, compressed_size) pair or an io error. -fn write_chunk(raw_data: &[u8], compression_buffer: &mut Vec, path: &Path) -> Result<(H256, usize), Error> { - let compressed_size = snappy::compress_into(raw_data, compression_buffer); - let compressed = &compression_buffer[..compressed_size]; - let hash = compressed.sha3(); - - let mut file_path = path.to_owned(); - file_path.push(hash.hex()); - - let mut file = try!(File::create(file_path)); - try!(file.write_all(compressed)); - - Ok((hash, compressed_size)) -} - /// Used to build block chunks. struct BlockChunker<'a> { - client: &'a BlockChainClient, + chain: &'a BlockChain, // block, receipt rlp pairs. rlps: VecDeque, current_hash: H256, hashes: Vec, snappy_buffer: Vec, + writer: &'a Mutex, } impl<'a> BlockChunker<'a> { // Repeatedly fill the buffers and writes out chunks, moving backwards from starting block hash. - // Loops until we reach the genesis, and writes out the remainder. - fn chunk_all(&mut self, genesis_hash: H256, path: &Path) -> Result<(), Error> { + // Loops until we reach the first desired block, and writes out the remainder. + fn chunk_all(&mut self, first_hash: H256) -> Result<(), Error> { let mut loaded_size = 0; - while self.current_hash != genesis_hash { - let block = self.client.block(BlockID::Hash(self.current_hash)) - .expect("started from the head of chain and walking backwards; client stores full chain; qed"); + while self.current_hash != first_hash { + let (block, receipts) = try!(self.chain.block(&self.current_hash) + .and_then(|b| self.chain.block_receipts(&self.current_hash).map(|r| (b, r))) + .ok_or(Error::BlockNotFound(self.current_hash))); + let view = BlockView::new(&block); let abridged_rlp = AbridgedBlock::from_block_view(&view).into_inner(); - let receipts = self.client.block_receipts(&self.current_hash) - .expect("started from head of chain and walking backwards; client stores full chain; qed"); - let pair = { let mut pair_stream = RlpStream::new_list(2); - pair_stream.append(&abridged_rlp).append(&receipts); + pair_stream.append_raw(&abridged_rlp, 1).append(&receipts); pair_stream.out() }; let new_loaded_size = loaded_size + pair.len(); - // cut off the chunk if too large + // cut off the chunk if too large. + if new_loaded_size > PREFERRED_CHUNK_SIZE { - let header = view.header_view(); - try!(self.write_chunk(header.parent_hash(), header.number(), path)); + try!(self.write_chunk()); loaded_size = pair.len(); } else { loaded_size = new_loaded_size; @@ -141,25 +138,44 @@ impl<'a> BlockChunker<'a> { } if loaded_size != 0 { - // we don't store the genesis block, so once we get to this point, - // the "first" block will be number 1. - try!(self.write_chunk(genesis_hash, 1, path)); + // we don't store the first block, so once we get to this point, + // the "first" block will be first_number + 1. + try!(self.write_chunk()); } Ok(()) } // write out the data in the buffers to a chunk on disk - fn write_chunk(&mut self, parent_hash: H256, number: u64, path: &Path) -> Result<(), Error> { + // + // we preface each chunk with the parent of the first block's details. + fn write_chunk(&mut self) -> Result<(), Error> { + // since the block we're inspecting now doesn't go into the + // chunk if it's too large, the current hash is the parent hash + // for the first block in that chunk. + let parent_hash = self.current_hash; + trace!(target: "snapshot", "prepared block chunk with {} blocks", self.rlps.len()); - let mut rlp_stream = RlpStream::new_list(self.rlps.len() + 2); - rlp_stream.append(&parent_hash).append(&number); + let (parent_number, parent_details) = try!(self.chain.block_number(&parent_hash) + .and_then(|n| self.chain.block_details(&parent_hash).map(|d| (n, d))) + .ok_or(Error::BlockNotFound(parent_hash))); + + let parent_total_difficulty = parent_details.total_difficulty; + + let mut rlp_stream = RlpStream::new_list(3 + self.rlps.len()); + rlp_stream.append(&parent_number).append(&parent_hash).append(&parent_total_difficulty); + for pair in self.rlps.drain(..) { rlp_stream.append_raw(&pair, 1); } let raw_data = rlp_stream.out(); - let (hash, size) = try!(write_chunk(&raw_data, &mut self.snappy_buffer, path)); + + let size = snappy::compress_into(&raw_data, &mut self.snappy_buffer); + let compressed = &self.snappy_buffer[..size]; + let hash = compressed.sha3(); + + try!(self.writer.lock().write_block_chunk(hash, compressed)); trace!(target: "snapshot", "wrote block chunk. hash: {}, size: {}, uncompressed size: {}", hash.hex(), size, raw_data.len()); self.hashes.push(hash); @@ -172,16 +188,29 @@ impl<'a> BlockChunker<'a> { /// /// The path parameter is the directory to store the block chunks in. /// This function assumes the directory exists already. -pub fn chunk_blocks(client: &BlockChainClient, best_block_hash: H256, genesis_hash: H256, path: &Path) -> Result, Error> { - let mut chunker = BlockChunker { - client: client, - rlps: VecDeque::new(), - current_hash: best_block_hash, - hashes: Vec::new(), - snappy_buffer: vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)], +/// Returns a list of chunk hashes, with the first having the blocks furthest from the genesis. +pub fn chunk_blocks<'a>(chain: &'a BlockChain, start_block_info: (u64, H256), writer: &Mutex) -> Result, Error> { + let (start_number, start_hash) = start_block_info; + + let first_hash = if start_number < SNAPSHOT_BLOCKS { + // use the genesis hash. + chain.genesis_hash() + } else { + let first_num = start_number - SNAPSHOT_BLOCKS; + chain.block_hash(first_num) + .expect("number before best block number; whole chain is stored; qed") }; - try!(chunker.chunk_all(genesis_hash, path)); + let mut chunker = BlockChunker { + chain: chain, + rlps: VecDeque::new(), + current_hash: start_hash, + hashes: Vec::new(), + snappy_buffer: vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)], + writer: writer, + }; + + try!(chunker.chunk_all(first_hash)); Ok(chunker.hashes) } @@ -191,8 +220,8 @@ struct StateChunker<'a> { hashes: Vec, rlps: Vec, cur_size: usize, - snapshot_path: &'a Path, snappy_buffer: Vec, + writer: &'a Mutex, } impl<'a> StateChunker<'a> { @@ -226,7 +255,12 @@ impl<'a> StateChunker<'a> { } let raw_data = stream.out(); - let (hash, compressed_size) = try!(write_chunk(&raw_data, &mut self.snappy_buffer, self.snapshot_path)); + + let compressed_size = snappy::compress_into(&raw_data, &mut self.snappy_buffer); + let compressed = &self.snappy_buffer[..compressed_size]; + let hash = compressed.sha3(); + + try!(self.writer.lock().write_state_chunk(hash, compressed)); trace!(target: "snapshot", "wrote state chunk. size: {}, uncompressed size: {}", compressed_size, raw_data.len()); self.hashes.push(hash); @@ -241,21 +275,21 @@ impl<'a> StateChunker<'a> { /// /// Returns a list of hashes of chunks created, or any error it may /// have encountered. -pub fn chunk_state(db: &HashDB, root: &H256, path: &Path) -> Result, Error> { - let account_view = try!(TrieDB::new(db, &root)); +pub fn chunk_state<'a>(db: &HashDB, root: &H256, writer: &Mutex) -> Result, Error> { + let account_trie = try!(TrieDB::new(db, &root)); let mut chunker = StateChunker { hashes: Vec::new(), rlps: Vec::new(), cur_size: 0, - snapshot_path: path, snappy_buffer: vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)], + writer: writer, }; trace!(target: "snapshot", "beginning state chunking"); // account_key here is the address' hash. - for (account_key, account_data) in account_view.iter() { + for (account_key, account_data) in account_trie.iter() { let account = Account::from_thin_rlp(account_data); let account_key_hash = H256::from_slice(&account_key); @@ -274,6 +308,7 @@ pub fn chunk_state(db: &HashDB, root: &H256, path: &Path) -> Result, E } /// Manifest data. +#[derive(Debug, Clone, PartialEq, Eq)] pub struct ManifestData { /// List of state chunk hashes. pub state_hashes: Vec, @@ -324,63 +359,60 @@ impl ManifestData { pub struct StateRebuilder { db: Box, state_root: H256, - snappy_buffer: Vec } impl StateRebuilder { /// Create a new state rebuilder to write into the given backing DB. - pub fn new(db: Box) -> Self { + pub fn new(db: Arc, pruning: Algorithm) -> Self { StateRebuilder { - db: db, - state_root: H256::zero(), - snappy_buffer: Vec::new(), + db: journaldb::new(db.clone(), pruning, ::client::DB_COL_STATE), + state_root: SHA3_NULL_RLP, } } - /// Feed a compressed state chunk into the rebuilder. - pub fn feed(&mut self, compressed: &[u8]) -> Result<(), Error> { - let len = try!(snappy::decompress_into(compressed, &mut self.snappy_buffer)); - let rlp = UntrustedRlp::new(&self.snappy_buffer[..len]); + /// Feed an uncompressed state chunk into the rebuilder. + pub fn feed(&mut self, chunk: &[u8]) -> Result<(), ::error::Error> { + let rlp = UntrustedRlp::new(chunk); let account_fat_rlps: Vec<_> = rlp.iter().map(|r| r.as_raw()).collect(); let mut pairs = Vec::with_capacity(rlp.item_count()); + let backing = self.db.backing().clone(); // initialize the pairs vector with empty values so we have slots to write into. - for _ in 0..rlp.item_count() { - pairs.push((H256::new(), Vec::new())); - } + pairs.resize(rlp.item_count(), (H256::new(), Vec::new())); - let chunk_size = account_fat_rlps.len() / ::num_cpus::get(); + let chunk_size = account_fat_rlps.len() / ::num_cpus::get() + 1; // build account tries in parallel. try!(scope(|scope| { let mut handles = Vec::new(); for (account_chunk, out_pairs_chunk) in account_fat_rlps.chunks(chunk_size).zip(pairs.chunks_mut(chunk_size)) { let mut db = self.db.boxed_clone(); - let handle: ScopedJoinHandle> = scope.spawn(move || { + let handle: ScopedJoinHandle, ::error::Error>> = scope.spawn(move || { try!(rebuild_account_trie(db.as_hashdb_mut(), account_chunk, out_pairs_chunk)); - // commit the db changes we made in this thread. - let batch = DBTransaction::new(&db.backing()); - try!(db.commit(&batch, 0, &H256::zero(), None)); - try!(db.backing().write(batch).map_err(UtilError::SimpleString)); - - Ok(()) + trace!(target: "snapshot", "thread rebuilt {} account tries", account_chunk.len()); + Ok(db) }); handles.push(handle); } - // see if we got any errors. + // commit all account tries to the db, but only in this thread. + let batch = backing.transaction(); for handle in handles { - try!(handle.join()); + let mut thread_db = try!(handle.join()); + try!(thread_db.inject(&batch)); } + try!(backing.write(batch).map_err(::util::UtilError::SimpleString)); - Ok::<_, Error>(()) + + Ok::<_, ::error::Error>(()) })); + // batch trie writes { - let mut account_trie = if self.state_root != H256::zero() { + let mut account_trie = if self.state_root != SHA3_NULL_RLP { try!(TrieDBMut::from_existing(self.db.as_hashdb_mut(), &mut self.state_root)) } else { TrieDBMut::new(self.db.as_hashdb_mut(), &mut self.state_root) @@ -391,9 +423,10 @@ impl StateRebuilder { } } - let batch = DBTransaction::new(self.db.backing()); - try!(self.db.commit(&batch, 0, &H256::zero(), None)); - try!(self.db.backing().write(batch).map_err(|e| Error::Util(e.into()))); + let batch = backing.transaction(); + try!(self.db.inject(&batch)); + try!(backing.write(batch).map_err(::util::UtilError::SimpleString)); + trace!(target: "snapshot", "current state root: {:?}", self.state_root); Ok(()) } @@ -401,7 +434,7 @@ impl StateRebuilder { pub fn state_root(&self) -> H256 { self.state_root } } -fn rebuild_account_trie(db: &mut HashDB, account_chunk: &[&[u8]], out_chunk: &mut [(H256, Bytes)]) -> Result<(), Error> { +fn rebuild_account_trie(db: &mut HashDB, account_chunk: &[&[u8]], out_chunk: &mut [(H256, Bytes)]) -> Result<(), ::error::Error> { for (account_pair, out) in account_chunk.into_iter().zip(out_chunk) { let account_rlp = UntrustedRlp::new(account_pair); @@ -410,7 +443,7 @@ fn rebuild_account_trie(db: &mut HashDB, account_chunk: &[&[u8]], out_chunk: &mu let fat_rlp = UntrustedRlp::new(&decompressed[..]); let thin_rlp = { - let mut acct_db = AccountDBMut::from_hash(db.as_hashdb_mut(), hash); + let mut acct_db = AccountDBMut::from_hash(db, hash); // fill out the storage trie and code while decoding. let acc = try!(Account::from_fat_rlp(&mut acct_db, fat_rlp)); @@ -422,3 +455,98 @@ fn rebuild_account_trie(db: &mut HashDB, account_chunk: &[&[u8]], out_chunk: &mu } Ok(()) } + +/// Proportion of blocks which we will verify PoW for. +const POW_VERIFY_RATE: f32 = 0.02; + +/// Rebuilds the blockchain from chunks. +/// +/// Does basic verification for all blocks, but PoW verification for some. +/// Blocks must be fed in-order. +/// +/// The first block in every chunk is disconnected from the last block in the +/// chunk before it, as chunks may be submitted out-of-order. +/// +/// After all chunks have been submitted, we "glue" the chunks together. +pub struct BlockRebuilder { + chain: BlockChain, + rng: OsRng, + disconnected: Vec<(u64, H256)>, + best_number: u64, +} + +impl BlockRebuilder { + /// Create a new BlockRebuilder. + pub fn new(chain: BlockChain, best_number: u64) -> Result { + Ok(BlockRebuilder { + chain: chain, + rng: try!(OsRng::new()), + disconnected: Vec::new(), + best_number: best_number, + }) + } + + /// Feed the rebuilder an uncompressed block chunk. + /// Returns the number of blocks fed or any errors. + pub fn feed(&mut self, chunk: &[u8], engine: &Engine) -> Result { + use basic_types::Seal::With; + use util::U256; + + let rlp = UntrustedRlp::new(chunk); + let item_count = rlp.item_count(); + + trace!(target: "snapshot", "restoring block chunk with {} blocks.", item_count - 2); + + // todo: assert here that these values are consistent with chunks being in order. + let mut cur_number = try!(rlp.val_at::(0)) + 1; + let mut parent_hash = try!(rlp.val_at::(1)); + let parent_total_difficulty = try!(rlp.val_at::(2)); + + for idx in 3..item_count { + let pair = try!(rlp.at(idx)); + let abridged_rlp = try!(pair.at(0)).as_raw().to_owned(); + let abridged_block = AbridgedBlock::from_raw(abridged_rlp); + let receipts: Vec<::receipt::Receipt> = try!(pair.val_at(1)); + let block = try!(abridged_block.to_block(parent_hash, cur_number)); + let block_bytes = block.rlp_bytes(With); + + if self.rng.gen::() <= POW_VERIFY_RATE { + try!(engine.verify_block_seal(&block.header)) + } else { + try!(engine.verify_block_basic(&block.header, Some(&block_bytes))); + } + + let is_best = cur_number == self.best_number; + + // special-case the first block in each chunk. + if idx == 3 { + if self.chain.insert_snapshot_block(&block_bytes, receipts, Some(parent_total_difficulty), is_best) { + self.disconnected.push((cur_number, block.header.hash())); + } + } else { + self.chain.insert_snapshot_block(&block_bytes, receipts, None, is_best); + } + self.chain.commit(); + + parent_hash = BlockView::new(&block_bytes).hash(); + cur_number += 1; + } + + Ok(item_count as u64 - 3) + } + + /// Glue together any disconnected chunks. To be called at the end. + pub fn glue_chunks(&mut self) { + for &(ref first_num, ref first_hash) in &self.disconnected { + let parent_num = first_num - 1; + + // check if the parent is even in the chain. + // since we don't restore every single block in the chain, + // the first block of the first chunks has nothing to connect to. + if let Some(parent_hash) = self.chain.block_hash(parent_num) { + // if so, add the child to it. + self.chain.add_child(parent_hash, *first_hash); + } + } + } +} diff --git a/ethcore/src/snapshot/service.rs b/ethcore/src/snapshot/service.rs new file mode 100644 index 000000000..d01c00f68 --- /dev/null +++ b/ethcore/src/snapshot/service.rs @@ -0,0 +1,435 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Snapshot network service implementation. + +use std::collections::HashSet; +use std::io::ErrorKind; +use std::fs; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use super::{ManifestData, StateRebuilder, BlockRebuilder}; +use super::io::{SnapshotReader, LooseReader}; + +use blockchain::BlockChain; +use engines::Engine; +use error::Error; +use service::ClientIoMessage; +use spec::Spec; + +use io::IoChannel; + +use util::{Bytes, H256, Mutex, UtilError}; +use util::journaldb::Algorithm; +use util::kvdb::{Database, DatabaseConfig}; +use util::snappy; + +/// Statuses for restorations. +#[derive(PartialEq, Clone, Copy, Debug)] +pub enum RestorationStatus { + /// No restoration. + Inactive, + /// Ongoing restoration. + Ongoing, + /// Failed restoration. + Failed, +} + +/// Restoration info. + +/// The interface for a snapshot network service. +/// This handles: +/// - restoration of snapshots to temporary databases. +/// - responding to queries for snapshot manifests and chunks +pub trait SnapshotService { + /// Query the most recent manifest data. + fn manifest(&self) -> Option; + + /// Get raw chunk for a given hash. + fn chunk(&self, hash: H256) -> Option; + + /// Ask the snapshot service for the restoration status. + fn status(&self) -> RestorationStatus; + + /// Ask the snapshot service for the number of chunks completed. + /// Return a tuple of (state_chunks, block_chunks). + /// Undefined when not restoring. + fn chunks_done(&self) -> (usize, usize); + + /// Begin snapshot restoration. + /// If restoration in-progress, this will reset it. + /// From this point on, any previous snapshot may become unavailable. + /// Returns true if successful, false otherwise. + fn begin_restore(&self, manifest: ManifestData) -> bool; + + /// Feed a raw state chunk to the service to be processed asynchronously. + /// no-op if not currently restoring. + fn restore_state_chunk(&self, hash: H256, chunk: Bytes); + + /// Feed a raw block chunk to the service to be processed asynchronously. + /// no-op if currently restoring. + fn restore_block_chunk(&self, hash: H256, chunk: Bytes); +} + +/// State restoration manager. +struct Restoration { + state_chunks_left: HashSet, + block_chunks_left: HashSet, + state: StateRebuilder, + blocks: BlockRebuilder, + snappy_buffer: Bytes, + final_state_root: H256, +} + +impl Restoration { + // make a new restoration, building databases in the given path. + fn new(manifest: &ManifestData, pruning: Algorithm, path: &Path, spec: &Spec) -> Result { + let cfg = DatabaseConfig::with_columns(::client::DB_NO_OF_COLUMNS); + let raw_db = Arc::new(try!(Database::open(&cfg, &*path.to_string_lossy()) + .map_err(|s| UtilError::SimpleString(s)))); + + let chain = BlockChain::new(Default::default(), &spec.genesis_block(), raw_db.clone()); + let blocks = try!(BlockRebuilder::new(chain, manifest.block_number)); + + Ok(Restoration { + state_chunks_left: manifest.state_hashes.iter().cloned().collect(), + block_chunks_left: manifest.block_hashes.iter().cloned().collect(), + state: StateRebuilder::new(raw_db, pruning), + blocks: blocks, + snappy_buffer: Vec::new(), + final_state_root: manifest.state_root, + }) + } + + // feeds a state chunk + fn feed_state(&mut self, hash: H256, chunk: &[u8]) -> Result<(), Error> { + use util::trie::TrieError; + + if self.state_chunks_left.remove(&hash) { + let len = try!(snappy::decompress_into(&chunk, &mut self.snappy_buffer)); + try!(self.state.feed(&self.snappy_buffer[..len])); + + if self.state_chunks_left.is_empty() { + let root = self.state.state_root(); + if root != self.final_state_root { + warn!("Final restored state has wrong state root: expected {:?}, got {:?}", root, self.final_state_root); + return Err(TrieError::InvalidStateRoot(root).into()); + } + } + } + + Ok(()) + } + + // feeds a block chunk + fn feed_blocks(&mut self, hash: H256, chunk: &[u8], engine: &Engine) -> Result<(), Error> { + if self.block_chunks_left.remove(&hash) { + let len = try!(snappy::decompress_into(&chunk, &mut self.snappy_buffer)); + try!(self.blocks.feed(&self.snappy_buffer[..len], engine)); + + if self.block_chunks_left.is_empty() { + // connect out-of-order chunks. + self.blocks.glue_chunks(); + } + } + + Ok(()) + } + + // is everything done? + fn is_done(&self) -> bool { + self.block_chunks_left.is_empty() && self.state_chunks_left.is_empty() + } +} + +/// Type alias for client io channel. +pub type Channel = IoChannel; + +/// Service implementation. +/// +/// This will replace the client's state DB as soon as the last state chunk +/// is fed, and will replace the client's blocks DB when the last block chunk +/// is fed. +pub struct Service { + restoration: Mutex>, + client_db: PathBuf, // "//db" + db_path: PathBuf, // "/" + io_channel: Channel, + pruning: Algorithm, + status: Mutex, + reader: Option, + spec: Spec, + state_chunks: AtomicUsize, + block_chunks: AtomicUsize, +} + +impl Service { + /// Create a new snapshot service. + pub fn new(spec: Spec, pruning: Algorithm, client_db: PathBuf, io_channel: Channel) -> Result { + let db_path = try!(client_db.parent().and_then(Path::parent) + .ok_or_else(|| UtilError::SimpleString("Failed to find database root.".into()))).to_owned(); + + let reader = { + let mut snapshot_path = db_path.clone(); + snapshot_path.push("snapshot"); + + LooseReader::new(snapshot_path).ok() + }; + + let service = Service { + restoration: Mutex::new(None), + client_db: client_db, + db_path: db_path, + io_channel: io_channel, + pruning: pruning, + status: Mutex::new(RestorationStatus::Inactive), + reader: reader, + spec: spec, + state_chunks: AtomicUsize::new(0), + block_chunks: AtomicUsize::new(0), + }; + + // create the snapshot dir if it doesn't exist. + match fs::create_dir_all(service.snapshot_dir()) { + Err(e) => { + if e.kind() != ErrorKind::AlreadyExists { + return Err(e.into()) + } + } + _ => {} + } + + // delete the temporary restoration dir if it does exist. + match fs::remove_dir_all(service.restoration_dir()) { + Err(e) => { + if e.kind() != ErrorKind::NotFound { + return Err(e.into()) + } + } + _ => {} + } + + Ok(service) + } + + // get the snapshot path. + fn snapshot_dir(&self) -> PathBuf { + let mut dir = self.db_path.clone(); + dir.push("snapshot"); + dir + } + + // get the restoration directory. + fn restoration_dir(&self) -> PathBuf { + let mut dir = self.snapshot_dir(); + dir.push("restoration"); + dir + } + + // restoration db path. + fn restoration_db(&self) -> PathBuf { + let mut dir = self.restoration_dir(); + dir.push("db"); + dir + } + + // replace one the client's database with our own. + fn replace_client_db(&self) -> Result<(), Error> { + let our_db = self.restoration_db(); + + trace!(target: "snapshot", "replacing {:?} with {:?}", self.client_db, our_db); + + let mut backup_db = self.restoration_dir(); + backup_db.push("backup_db"); + + let _ = fs::remove_dir_all(&backup_db); + + let existed = match fs::rename(&self.client_db, &backup_db) { + Ok(_) => true, + Err(e) => if let ErrorKind::NotFound = e.kind() { + false + } else { + return Err(e.into()); + } + }; + + match fs::rename(&our_db, &self.client_db) { + Ok(_) => { + // clean up the backup. + if existed { + try!(fs::remove_dir_all(&backup_db)); + } + Ok(()) + } + Err(e) => { + // restore the backup. + if existed { + try!(fs::rename(&backup_db, &self.client_db)); + } + Err(e.into()) + } + } + } + + // finalize the restoration. this accepts an already-locked + // restoration as an argument -- so acquiring it again _will_ + // lead to deadlock. + fn finalize_restoration(&self, rest: &mut Option) -> Result<(), Error> { + trace!(target: "snapshot", "finalizing restoration"); + + self.state_chunks.store(0, Ordering::SeqCst); + self.block_chunks.store(0, Ordering::SeqCst); + + // destroy the restoration before replacing databases. + *rest = None; + + try!(self.replace_client_db()); + + *self.status.lock() = RestorationStatus::Inactive; + + // TODO: take control of restored snapshot. + let _ = fs::remove_dir_all(self.restoration_dir()); + + Ok(()) + } + + /// Feed a chunk of either kind. no-op if no restoration or status is wrong. + fn feed_chunk(&self, hash: H256, chunk: &[u8], is_state: bool) -> Result<(), Error> { + match self.status() { + RestorationStatus::Inactive | RestorationStatus::Failed => Ok(()), + RestorationStatus::Ongoing => { + // TODO: be able to process block chunks and state chunks at same time? + let mut restoration = self.restoration.lock(); + + let res = { + let rest = match *restoration { + Some(ref mut r) => r, + None => return Ok(()), + }; + + match is_state { + true => rest.feed_state(hash, chunk), + false => rest.feed_blocks(hash, chunk, &*self.spec.engine), + }.map(|_| rest.is_done()) + }; + + match res { + Ok(is_done) => { + match is_state { + true => self.state_chunks.fetch_add(1, Ordering::SeqCst), + false => self.block_chunks.fetch_add(1, Ordering::SeqCst), + }; + + match is_done { + true => self.finalize_restoration(&mut *restoration), + false => Ok(()) + } + } + other => other.map(drop), + } + } + } + } + + /// Feed a state chunk to be processed synchronously. + pub fn feed_state_chunk(&self, hash: H256, chunk: &[u8]) { + match self.feed_chunk(hash, chunk, true) { + Ok(()) => (), + Err(e) => { + warn!("Encountered error during state restoration: {}", e); + *self.restoration.lock() = None; + *self.status.lock() = RestorationStatus::Failed; + let _ = fs::remove_dir_all(self.restoration_dir()); + } + } + } + + /// Feed a block chunk to be processed synchronously. + pub fn feed_block_chunk(&self, hash: H256, chunk: &[u8]) { + match self.feed_chunk(hash, chunk, false) { + Ok(()) => (), + Err(e) => { + warn!("Encountered error during block restoration: {}", e); + *self.restoration.lock() = None; + *self.status.lock() = RestorationStatus::Failed; + let _ = fs::remove_dir_all(self.restoration_dir()); + } + } + } +} + +impl SnapshotService for Service { + fn manifest(&self) -> Option { + self.reader.as_ref().map(|r| r.manifest().clone()) + } + + fn chunk(&self, hash: H256) -> Option { + self.reader.as_ref().and_then(|r| r.chunk(hash).ok()) + } + + fn status(&self) -> RestorationStatus { + *self.status.lock() + } + + fn chunks_done(&self) -> (usize, usize) { + (self.state_chunks.load(Ordering::Relaxed), self.block_chunks.load(Ordering::Relaxed)) + } + + fn begin_restore(&self, manifest: ManifestData) -> bool { + let rest_dir = self.restoration_dir(); + + let mut res = self.restoration.lock(); + + // tear down existing restoration. + *res = None; + + // delete and restore the restoration dir. + if let Err(e) = fs::remove_dir_all(&rest_dir).and_then(|_| fs::create_dir_all(&rest_dir)) { + match e.kind() { + ErrorKind::NotFound => {}, + _ => { + warn!("encountered error {} while beginning snapshot restoration.", e); + return false; + } + } + } + + // make new restoration. + let db_path = self.restoration_db(); + *res = match Restoration::new(&manifest, self.pruning, &db_path, &self.spec) { + Ok(b) => Some(b), + Err(e) => { + warn!("encountered error {} while beginning snapshot restoration.", e); + return false; + } + }; + + *self.status.lock() = RestorationStatus::Ongoing; + true + } + + fn restore_state_chunk(&self, hash: H256, chunk: Bytes) { + self.io_channel.send(ClientIoMessage::FeedStateChunk(hash, chunk)) + .expect("snapshot service and io service are kept alive by client service; qed"); + } + + fn restore_block_chunk(&self, hash: H256, chunk: Bytes) { + self.io_channel.send(ClientIoMessage::FeedBlockChunk(hash, chunk)) + .expect("snapshot service and io service are kept alive by client service; qed"); + } +} \ No newline at end of file diff --git a/ethcore/src/snapshot/tests/blocks.rs b/ethcore/src/snapshot/tests/blocks.rs new file mode 100644 index 000000000..00aab8db4 --- /dev/null +++ b/ethcore/src/snapshot/tests/blocks.rs @@ -0,0 +1,91 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Block chunker and rebuilder tests. + +use devtools::RandomTempPath; + +use blockchain::generator::{ChainGenerator, ChainIterator, BlockFinalizer}; +use blockchain::BlockChain; +use snapshot::{chunk_blocks, BlockRebuilder}; +use snapshot::io::{PackedReader, PackedWriter, SnapshotReader, SnapshotWriter}; + +use util::{Mutex, snappy}; +use util::kvdb::{Database, DatabaseConfig}; + +use std::sync::Arc; + +fn chunk_and_restore(amount: u64) { + let mut canon_chain = ChainGenerator::default(); + let mut finalizer = BlockFinalizer::default(); + let genesis = canon_chain.generate(&mut finalizer).unwrap(); + let db_cfg = DatabaseConfig::with_columns(::client::DB_NO_OF_COLUMNS); + + let orig_path = RandomTempPath::create_dir(); + let new_path = RandomTempPath::create_dir(); + let mut snapshot_path = new_path.as_path().to_owned(); + snapshot_path.push("SNAP"); + + let old_db = Arc::new(Database::open(&db_cfg, orig_path.as_str()).unwrap()); + let bc = BlockChain::new(Default::default(), &genesis, old_db.clone()); + + // build the blockchain. + for _ in 0..amount { + let block = canon_chain.generate(&mut finalizer).unwrap(); + let batch = old_db.transaction(); + bc.insert_block(&batch, &block, vec![]); + bc.commit(); + old_db.write(batch).unwrap(); + } + + let best_hash = bc.best_block_hash(); + + // snapshot it. + let writer = Mutex::new(PackedWriter::new(&snapshot_path).unwrap()); + let block_hashes = chunk_blocks(&bc, (amount, best_hash), &writer).unwrap(); + writer.into_inner().finish(::snapshot::ManifestData { + state_hashes: Vec::new(), + block_hashes: block_hashes, + state_root: Default::default(), + block_number: amount, + block_hash: best_hash, + }).unwrap(); + + // restore it. + let new_db = Arc::new(Database::open(&db_cfg, new_path.as_str()).unwrap()); + let new_chain = BlockChain::new(Default::default(), &genesis, new_db.clone()); + let mut rebuilder = BlockRebuilder::new(new_chain, amount).unwrap(); + let reader = PackedReader::new(&snapshot_path).unwrap().unwrap(); + let engine = ::engines::NullEngine::new(Default::default(), Default::default()); + for chunk_hash in &reader.manifest().block_hashes { + let compressed = reader.chunk(*chunk_hash).unwrap(); + let chunk = snappy::decompress(&compressed).unwrap(); + rebuilder.feed(&chunk, &engine).unwrap(); + } + + rebuilder.glue_chunks(); + drop(rebuilder); + + // and test it. + let new_chain = BlockChain::new(Default::default(), &genesis, new_db); + assert_eq!(new_chain.best_block_hash(), best_hash); +} + +#[test] +fn chunk_and_restore_500() { chunk_and_restore(500) } + +#[test] +fn chunk_and_restore_40k() { chunk_and_restore(40000) } \ No newline at end of file diff --git a/ethcore/src/snapshot/tests/helpers.rs b/ethcore/src/snapshot/tests/helpers.rs new file mode 100644 index 000000000..20fd4ae03 --- /dev/null +++ b/ethcore/src/snapshot/tests/helpers.rs @@ -0,0 +1,122 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Snapshot test helpers. These are used to build blockchains and state tries +//! which can be queried before and after a full snapshot/restore cycle. + +use account_db::AccountDBMut; +use rand::Rng; +use snapshot::account::Account; + +use util::hash::{FixedHash, H256}; +use util::hashdb::HashDB; +use util::trie::{Alphabet, StandardMap, SecTrieDBMut, TrieMut, ValueMode}; +use util::trie::{TrieDB, TrieDBMut}; +use util::rlp::SHA3_NULL_RLP; + +// the proportion of accounts we will alter each tick. +const ACCOUNT_CHURN: f32 = 0.01; + +/// This structure will incrementally alter a state given an rng. +pub struct StateProducer { + state_root: H256, + storage_seed: H256, +} + +impl StateProducer { + /// Create a new `StateProducer`. + pub fn new() -> Self { + StateProducer { + state_root: SHA3_NULL_RLP, + storage_seed: H256::zero(), + } + } + + /// Tick the state producer. This alters the state, writing new data into + /// the database. + pub fn tick(&mut self, rng: &mut R, db: &mut HashDB) { + // modify existing accounts. + let mut accounts_to_modify: Vec<_> = { + let trie = TrieDB::new(&*db, &self.state_root).unwrap(); + trie.iter() + .filter(|_| rng.gen::() < ACCOUNT_CHURN) + .map(|(k, v)| (H256::from_slice(&k), v.to_owned())) + .collect() + }; + + // sweep once to alter storage tries. + for &mut (ref mut address_hash, ref mut account_data) in &mut accounts_to_modify { + let mut account = Account::from_thin_rlp(&*account_data); + let acct_db = AccountDBMut::from_hash(db, *address_hash); + fill_storage(acct_db, account.storage_root_mut(), &mut self.storage_seed); + *account_data = account.to_thin_rlp(); + } + + // sweep again to alter account trie. + let mut trie = TrieDBMut::from_existing(db, &mut self.state_root).unwrap(); + + for (address_hash, account_data) in accounts_to_modify { + trie.insert(&address_hash[..], &account_data).unwrap(); + } + + // add between 0 and 5 new accounts each tick. + let new_accs = rng.gen::() % 5; + + for _ in 0..new_accs { + let address_hash = H256::random(); + let balance: usize = rng.gen(); + let nonce: usize = rng.gen(); + let acc = ::account::Account::new_basic(balance.into(), nonce.into()).rlp(); + trie.insert(&address_hash[..], &acc).unwrap(); + } + } + + /// Get the current state root. + pub fn state_root(&self) -> H256 { + self.state_root + } +} + +/// Fill the storage of an account. +pub fn fill_storage(mut db: AccountDBMut, root: &mut H256, seed: &mut H256) { + let map = StandardMap { + alphabet: Alphabet::All, + min_key: 6, + journal_key: 6, + value_mode: ValueMode::Random, + count: 100, + }; + { + let mut trie = if *root == SHA3_NULL_RLP { + SecTrieDBMut::new(&mut db, root) + } else { + SecTrieDBMut::from_existing(&mut db, root).unwrap() + }; + + for (k, v) in map.make_with(seed) { + trie.insert(&k, &v).unwrap(); + } + } +} + +/// Compare two state dbs. +pub fn compare_dbs(one: &HashDB, two: &HashDB) { + let keys = one.keys(); + + for (key, _) in keys { + assert_eq!(one.get(&key).unwrap(), two.get(&key).unwrap()); + } +} \ No newline at end of file diff --git a/ethcore/src/snapshot/tests/mod.rs b/ethcore/src/snapshot/tests/mod.rs new file mode 100644 index 000000000..e8c288ebb --- /dev/null +++ b/ethcore/src/snapshot/tests/mod.rs @@ -0,0 +1,22 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Snapshot tests. + +mod blocks; +mod state; + +pub mod helpers; \ No newline at end of file diff --git a/ethcore/src/snapshot/tests/state.rs b/ethcore/src/snapshot/tests/state.rs new file mode 100644 index 000000000..a1a3763b8 --- /dev/null +++ b/ethcore/src/snapshot/tests/state.rs @@ -0,0 +1,82 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! State snapshotting tests. + +use snapshot::{chunk_state, StateRebuilder}; +use snapshot::io::{PackedReader, PackedWriter, SnapshotReader, SnapshotWriter}; +use super::helpers::{compare_dbs, StateProducer}; + +use rand; +use util::hash::H256; +use util::journaldb::{self, Algorithm}; +use util::kvdb::{Database, DatabaseConfig}; +use util::memorydb::MemoryDB; +use util::Mutex; +use devtools::RandomTempPath; + +use std::sync::Arc; + +#[test] +fn snap_and_restore() { + let mut producer = StateProducer::new(); + let mut rng = rand::thread_rng(); + let mut old_db = MemoryDB::new(); + let db_cfg = DatabaseConfig::with_columns(::client::DB_NO_OF_COLUMNS); + + for _ in 0..150 { + producer.tick(&mut rng, &mut old_db); + } + + let snap_dir = RandomTempPath::create_dir(); + let mut snap_file = snap_dir.as_path().to_owned(); + snap_file.push("SNAP"); + + let state_root = producer.state_root(); + let writer = Mutex::new(PackedWriter::new(&snap_file).unwrap()); + + let state_hashes = chunk_state(&old_db, &state_root, &writer).unwrap(); + + writer.into_inner().finish(::snapshot::ManifestData { + state_hashes: state_hashes, + block_hashes: Vec::new(), + state_root: state_root, + block_number: 0, + block_hash: H256::default(), + }).unwrap(); + + let mut db_path = snap_dir.as_path().to_owned(); + db_path.push("db"); + let db = { + let new_db = Arc::new(Database::open(&db_cfg, &db_path.to_string_lossy()).unwrap()); + let mut rebuilder = StateRebuilder::new(new_db.clone(), Algorithm::Archive); + let reader = PackedReader::new(&snap_file).unwrap().unwrap(); + + for chunk_hash in &reader.manifest().state_hashes { + let raw = reader.chunk(*chunk_hash).unwrap(); + let chunk = ::util::snappy::decompress(&raw).unwrap(); + + rebuilder.feed(&chunk).unwrap(); + } + + assert_eq!(rebuilder.state_root(), state_root); + new_db + }; + + let new_db = journaldb::new(db, Algorithm::Archive, ::client::DB_COL_STATE); + + compare_dbs(&old_db, new_db.as_hashdb()); +} \ No newline at end of file diff --git a/ethcore/src/spec/spec.rs b/ethcore/src/spec/spec.rs index 6756d5524..a0c32d51a 100644 --- a/ethcore/src/spec/spec.rs +++ b/ethcore/src/spec/spec.rs @@ -29,6 +29,7 @@ use std::cell::RefCell; /// Parameters common to all engines. #[derive(Debug, PartialEq, Clone)] +#[cfg_attr(test, derive(Default))] pub struct CommonParams { /// Account start nonce. pub account_start_nonce: U256, @@ -60,7 +61,7 @@ pub struct Spec { /// User friendly spec name pub name: String, /// What engine are we using for this? - pub engine: Box, + pub engine: Arc, /// The fork identifier for this chain. Only needed to distinguish two chains sharing the same genesis. pub fork_name: Option, @@ -130,14 +131,14 @@ impl From for Spec { } impl Spec { - /// Convert engine spec into a boxed Engine of the right underlying type. + /// Convert engine spec into a arc'd Engine of the right underlying type. /// TODO avoid this hard-coded nastiness - use dynamic-linked plugin framework instead. - fn engine(engine_spec: ethjson::spec::Engine, params: CommonParams, builtins: BTreeMap) -> Box { + fn engine(engine_spec: ethjson::spec::Engine, params: CommonParams, builtins: BTreeMap) -> Arc { match engine_spec { - ethjson::spec::Engine::Null => Box::new(NullEngine::new(params, builtins)), - ethjson::spec::Engine::InstantSeal => Box::new(InstantSeal::new(params, builtins)), - ethjson::spec::Engine::Ethash(ethash) => Box::new(ethereum::Ethash::new(params, From::from(ethash.params), builtins)), - ethjson::spec::Engine::BasicAuthority(basic_authority) => Box::new(BasicAuthority::new(params, From::from(basic_authority.params), builtins)), + ethjson::spec::Engine::Null => Arc::new(NullEngine::new(params, builtins)), + ethjson::spec::Engine::InstantSeal => Arc::new(InstantSeal::new(params, builtins)), + ethjson::spec::Engine::Ethash(ethash) => Arc::new(ethereum::Ethash::new(params, From::from(ethash.params), builtins)), + ethjson::spec::Engine::BasicAuthority(basic_authority) => Arc::new(BasicAuthority::new(params, From::from(basic_authority.params), builtins)), } } diff --git a/ethcore/src/state.rs b/ethcore/src/state.rs index 28091a29d..1dea50442 100644 --- a/ethcore/src/state.rs +++ b/ethcore/src/state.rs @@ -1489,4 +1489,4 @@ fn create_empty() { assert_eq!(state.root().hex(), "56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"); } -} +} \ No newline at end of file diff --git a/ethcore/src/tests/client.rs b/ethcore/src/tests/client.rs index d3677eb50..ef97c7528 100644 --- a/ethcore/src/tests/client.rs +++ b/ethcore/src/tests/client.rs @@ -25,7 +25,7 @@ use miner::Miner; #[test] fn imports_from_empty() { let dir = RandomTempPath::new(); - let client = Client::new(ClientConfig::default(), get_test_spec(), dir.as_path(), Arc::new(Miner::with_spec(get_test_spec())), IoChannel::disconnected()).unwrap(); + let client = Client::new(ClientConfig::default(), &get_test_spec(), dir.as_path(), Arc::new(Miner::with_spec(get_test_spec())), IoChannel::disconnected()).unwrap(); client.import_verified_blocks(); client.flush_queue(); } @@ -43,7 +43,7 @@ fn returns_state_root_basic() { #[test] fn imports_good_block() { let dir = RandomTempPath::new(); - let client = Client::new(ClientConfig::default(), get_test_spec(), dir.as_path(), Arc::new(Miner::with_spec(get_test_spec())), IoChannel::disconnected()).unwrap(); + let client = Client::new(ClientConfig::default(), &get_test_spec(), dir.as_path(), Arc::new(Miner::with_spec(get_test_spec())), IoChannel::disconnected()).unwrap(); let good_block = get_good_dummy_block(); if let Err(_) = client.import_block(good_block) { panic!("error importing block being good by definition"); @@ -58,7 +58,7 @@ fn imports_good_block() { #[test] fn query_none_block() { let dir = RandomTempPath::new(); - let client = Client::new(ClientConfig::default(), get_test_spec(), dir.as_path(), Arc::new(Miner::with_spec(get_test_spec())), IoChannel::disconnected()).unwrap(); + let client = Client::new(ClientConfig::default(), &get_test_spec(), dir.as_path(), Arc::new(Miner::with_spec(get_test_spec())), IoChannel::disconnected()).unwrap(); let non_existant = client.block_header(BlockID::Number(188)); assert!(non_existant.is_none()); diff --git a/ethcore/src/tests/helpers.rs b/ethcore/src/tests/helpers.rs index 85d5e3ef9..b84f72c02 100644 --- a/ethcore/src/tests/helpers.rs +++ b/ethcore/src/tests/helpers.rs @@ -35,7 +35,7 @@ pub enum ChainEra { } pub struct TestEngine { - engine: Box, + engine: Arc, max_depth: usize } @@ -133,7 +133,7 @@ pub fn generate_dummy_client_with_spec_and_data(get_test_spec: F, block_numbe let dir = RandomTempPath::new(); let test_spec = get_test_spec(); - let client = Client::new(ClientConfig::default(), get_test_spec(), dir.as_path(), Arc::new(Miner::with_spec(get_test_spec())), IoChannel::disconnected()).unwrap(); + let client = Client::new(ClientConfig::default(), &get_test_spec(), dir.as_path(), Arc::new(Miner::with_spec(get_test_spec())), IoChannel::disconnected()).unwrap(); let test_engine = &test_spec.engine; let mut db_result = get_temp_journal_db(); @@ -232,7 +232,7 @@ pub fn push_blocks_to_client(client: &Arc, timestamp_salt: u64, starting pub fn get_test_client_with_blocks(blocks: Vec) -> GuardedTempResult> { let dir = RandomTempPath::new(); - let client = Client::new(ClientConfig::default(), get_test_spec(), dir.as_path(), Arc::new(Miner::with_spec(get_test_spec())), IoChannel::disconnected()).unwrap(); + let client = Client::new(ClientConfig::default(), &get_test_spec(), dir.as_path(), Arc::new(Miner::with_spec(get_test_spec())), IoChannel::disconnected()).unwrap(); for block in &blocks { if let Err(_) = client.import_block(block.clone()) { panic!("panic importing block which is well-formed"); diff --git a/ethcore/src/tests/rpc.rs b/ethcore/src/tests/rpc.rs index 5aaea356b..9d6ac0774 100644 --- a/ethcore/src/tests/rpc.rs +++ b/ethcore/src/tests/rpc.rs @@ -32,7 +32,7 @@ pub fn run_test_worker(scope: &crossbeam::Scope, stop: Arc, socket_p let temp = RandomTempPath::create_dir(); let client = Client::new( ClientConfig::default(), - get_test_spec(), + &get_test_spec(), temp.as_path(), Arc::new(Miner::with_spec(get_test_spec())), IoChannel::disconnected()).unwrap(); diff --git a/ethcore/src/types/blockchain_info.rs b/ethcore/src/types/blockchain_info.rs index 4e8634dba..75be55dc9 100644 --- a/ethcore/src/types/blockchain_info.rs +++ b/ethcore/src/types/blockchain_info.rs @@ -23,7 +23,7 @@ use std::mem; use std::collections::VecDeque; /// Information about the blockchain gathered together. -#[derive(Debug, Binary)] +#[derive(Clone, Debug, Binary)] pub struct BlockChainInfo { /// Blockchain difficulty. pub total_difficulty: U256, diff --git a/parity/cli.rs b/parity/cli.rs index b76bf9d5d..2c4239c34 100644 --- a/parity/cli.rs +++ b/parity/cli.rs @@ -32,6 +32,8 @@ Usage: parity import [ ] [options] parity export [ ] [options] parity signer new-token [options] + parity snapshot [options] + parity restore [options] Operating Options: --mode MODE Set the operating mode. MODE can be one of: @@ -286,6 +288,8 @@ pub struct Args { pub cmd_import: bool, pub cmd_signer: bool, pub cmd_new_token: bool, + pub cmd_snapshot: bool, + pub cmd_restore: bool, pub cmd_ui: bool, pub arg_pid_file: String, pub arg_file: Option, diff --git a/parity/configuration.rs b/parity/configuration.rs index 1aef086f5..18d54f91c 100644 --- a/parity/configuration.rs +++ b/parity/configuration.rs @@ -41,6 +41,7 @@ use run::RunCmd; use blockchain::{BlockchainCmd, ImportBlockchain, ExportBlockchain, DataFormat}; use presale::ImportWallet; use account::{AccountCmd, NewAccount, ImportAccounts}; +use snapshot::{self, SnapshotCommand}; #[derive(Debug, PartialEq)] pub enum Cmd { @@ -50,6 +51,7 @@ pub enum Cmd { ImportPresaleWallet(ImportWallet), Blockchain(BlockchainCmd), SignerToken(String), + Snapshot(SnapshotCommand), } #[derive(Debug, PartialEq)] @@ -156,6 +158,36 @@ impl Configuration { to_block: try!(to_block_id(&self.args.flag_to)), }; Cmd::Blockchain(BlockchainCmd::Export(export_cmd)) + } else if self.args.cmd_snapshot { + let snapshot_cmd = SnapshotCommand { + cache_config: cache_config, + dirs: dirs, + spec: spec, + pruning: pruning, + logger_config: logger_config, + mode: mode, + tracing: tracing, + compaction: compaction, + file_path: self.args.arg_file.clone(), + wal: wal, + kind: snapshot::Kind::Take, + }; + Cmd::Snapshot(snapshot_cmd) + } else if self.args.cmd_restore { + let restore_cmd = SnapshotCommand { + cache_config: cache_config, + dirs: dirs, + spec: spec, + pruning: pruning, + logger_config: logger_config, + mode: mode, + tracing: tracing, + compaction: compaction, + file_path: self.args.arg_file.clone(), + wal: wal, + kind: snapshot::Kind::Restore, + }; + Cmd::Snapshot(restore_cmd) } else { let daemon = if self.args.cmd_daemon { Some(self.args.arg_pid_file.clone()) diff --git a/parity/main.rs b/parity/main.rs index cc2810c08..bb9f5e743 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -82,6 +82,7 @@ mod blockchain; mod presale; mod run; mod sync; +mod snapshot; use std::{process, env}; use cli::print_version; @@ -99,6 +100,7 @@ fn execute(command: Cmd) -> Result { Cmd::ImportPresaleWallet(presale_cmd) => presale::execute(presale_cmd), Cmd::Blockchain(blockchain_cmd) => blockchain::execute(blockchain_cmd), Cmd::SignerToken(path) => signer::new_token(path), + Cmd::Snapshot(snapshot_cmd) => snapshot::execute(snapshot_cmd), } } @@ -131,4 +133,3 @@ fn main() { } } } - diff --git a/parity/run.rs b/parity/run.rs index d3aeaea2f..04037c9b6 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -171,7 +171,7 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> { net_conf.boot_nodes = spec.nodes.clone(); } - // create client + // create client service. let service = try!(ClientService::start( client_config, spec, diff --git a/parity/snapshot.rs b/parity/snapshot.rs new file mode 100644 index 000000000..c357b7b22 --- /dev/null +++ b/parity/snapshot.rs @@ -0,0 +1,195 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Snapshot and restoration commands. + +use std::time::Duration; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use ethcore_logger::{setup_log, Config as LogConfig}; +use ethcore::snapshot::{RestorationStatus, SnapshotService}; +use ethcore::snapshot::io::{SnapshotReader, PackedReader, PackedWriter}; +use ethcore::service::ClientService; +use ethcore::client::{Mode, DatabaseCompactionProfile, Switch, VMType}; +use ethcore::miner::Miner; +use cache::CacheConfig; +use params::{SpecType, Pruning}; +use helpers::{to_client_config, execute_upgrades}; +use dir::Directories; +use fdlimit; + +use io::PanicHandler; + +/// Kinds of snapshot commands. +#[derive(Debug, PartialEq, Clone, Copy)] +pub enum Kind { + /// Take a snapshot. + Take, + /// Restore a snapshot. + Restore +} + +/// Command for snapshot creation or restoration. +#[derive(Debug, PartialEq)] +pub struct SnapshotCommand { + pub cache_config: CacheConfig, + pub dirs: Directories, + pub spec: SpecType, + pub pruning: Pruning, + pub logger_config: LogConfig, + pub mode: Mode, + pub tracing: Switch, + pub compaction: DatabaseCompactionProfile, + pub file_path: Option, + pub wal: bool, + pub kind: Kind, +} + +impl SnapshotCommand { + // shared portion of snapshot commands: start the client service + fn start_service(self) -> Result<(ClientService, Arc), String> { + // Setup panic handler + let panic_handler = PanicHandler::new_in_arc(); + + // load spec file + let spec = try!(self.spec.spec()); + + // load genesis hash + let genesis_hash = spec.genesis_header().hash(); + + // Setup logging + let _logger = setup_log(&self.logger_config); + + fdlimit::raise_fd_limit(); + + // select pruning algorithm + let algorithm = self.pruning.to_algorithm(&self.dirs, genesis_hash, spec.fork_name.as_ref()); + + // prepare client_path + let client_path = self.dirs.client_path(genesis_hash, spec.fork_name.as_ref(), algorithm); + + // execute upgrades + try!(execute_upgrades(&self.dirs, genesis_hash, spec.fork_name.as_ref(), algorithm, self.compaction.compaction_profile())); + + // prepare client config + let client_config = to_client_config(&self.cache_config, &self.dirs, genesis_hash, self.mode, self.tracing, self.pruning, self.compaction, self.wal, VMType::default(), "".into(), spec.fork_name.as_ref()); + + let service = try!(ClientService::start( + client_config, + spec, + Path::new(&client_path), + Arc::new(Miner::with_spec(try!(self.spec.spec()))) + ).map_err(|e| format!("Client service error: {:?}", e))); + + Ok((service, panic_handler)) + } + + /// restore from a snapshot + pub fn restore(self) -> Result<(), String> { + let file = try!(self.file_path.clone().ok_or("No file path provided.".to_owned())); + let (service, _panic_handler) = try!(self.start_service()); + + warn!("Snapshot restoration is experimental and the format may be subject to change."); + + let snapshot = service.snapshot_service(); + let reader = PackedReader::new(&Path::new(&file)) + .map_err(|e| format!("Couldn't open snapshot file: {}", e)) + .and_then(|x| x.ok_or("Snapshot file has invalid format.".into())); + + let reader = try!(reader); + let manifest = reader.manifest(); + + // drop the client so we don't restore while it has open DB handles. + drop(service); + + if !snapshot.begin_restore(manifest.clone()) { + return Err("Failed to begin restoration.".into()); + } + + let (num_state, num_blocks) = (manifest.state_hashes.len(), manifest.block_hashes.len()); + + let informant_handle = snapshot.clone(); + ::std::thread::spawn(move || { + while let RestorationStatus::Ongoing = informant_handle.status() { + let (state_chunks, block_chunks) = informant_handle.chunks_done(); + info!("Processed {}/{} state chunks and {}/{} block chunks.", + state_chunks, num_state, block_chunks, num_blocks); + + ::std::thread::sleep(Duration::from_secs(5)); + } + }); + + info!("Restoring state"); + for &state_hash in &manifest.state_hashes { + if snapshot.status() == RestorationStatus::Failed { + return Err("Restoration failed".into()); + } + + let chunk = try!(reader.chunk(state_hash) + .map_err(|e| format!("Encountered error while reading chunk {:?}: {}", state_hash, e))); + snapshot.feed_state_chunk(state_hash, &chunk); + } + + info!("Restoring blocks"); + for &block_hash in &manifest.block_hashes { + if snapshot.status() == RestorationStatus::Failed { + return Err("Restoration failed".into()); + } + + let chunk = try!(reader.chunk(block_hash) + .map_err(|e| format!("Encountered error while reading chunk {:?}: {}", block_hash, e))); + snapshot.feed_block_chunk(block_hash, &chunk); + } + + match snapshot.status() { + RestorationStatus::Ongoing => Err("Snapshot file is incomplete and missing chunks.".into()), + RestorationStatus::Failed => Err("Snapshot restoration failed.".into()), + RestorationStatus::Inactive => { + info!("Restoration complete."); + Ok(()) + } + } + } + + /// Take a snapshot from the head of the chain. + pub fn take_snapshot(self) -> Result<(), String> { + let file_path = try!(self.file_path.clone().ok_or("No file path provided.".to_owned())); + let file_path: PathBuf = file_path.into(); + let (service, _panic_handler) = try!(self.start_service()); + + warn!("Snapshots are currently experimental. File formats may be subject to change."); + + let writer = try!(PackedWriter::new(&file_path) + .map_err(|e| format!("Failed to open snapshot writer: {}", e))); + + if let Err(e) = service.client().take_snapshot(writer) { + let _ = ::std::fs::remove_file(&file_path); + return Err(format!("Encountered fatal error while creating snapshot: {}", e)); + } + + Ok(()) + } +} + +/// Execute this snapshot command. +pub fn execute(cmd: SnapshotCommand) -> Result { + match cmd.kind { + Kind::Take => try!(cmd.take_snapshot()), + Kind::Restore => try!(cmd.restore()), + } + + Ok(String::new()) +} \ No newline at end of file diff --git a/rpc/src/v1/tests/eth.rs b/rpc/src/v1/tests/eth.rs index 8ae58a129..20c83b49b 100644 --- a/rpc/src/v1/tests/eth.rs +++ b/rpc/src/v1/tests/eth.rs @@ -111,7 +111,7 @@ impl EthTester { let dir = RandomTempPath::new(); let account_provider = account_provider(); let miner_service = miner_service(spec_provider(), account_provider.clone()); - let client = Client::new(ClientConfig::default(), spec_provider(), dir.as_path(), miner_service.clone(), IoChannel::disconnected()).unwrap(); + let client = Client::new(ClientConfig::default(), &spec_provider(), dir.as_path(), miner_service.clone(), IoChannel::disconnected()).unwrap(); let sync_provider = sync_provider(); let external_miner = Arc::new(ExternalMiner::default()); diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 7f991ee74..3aa93662e 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -51,7 +51,7 @@ //! ); //! let client = Client::new( //! ClientConfig::default(), -//! ethereum::new_frontier(), +//! ðereum::new_frontier(), //! &dir, //! miner, //! IoChannel::disconnected() diff --git a/util/src/journaldb/mod.rs b/util/src/journaldb/mod.rs index bb79ed9a8..5f6b8f5f9 100644 --- a/util/src/journaldb/mod.rs +++ b/util/src/journaldb/mod.rs @@ -184,4 +184,4 @@ mod tests { assert_eq!(overlayrecent, 1); assert_eq!(refcounted, 1); } -} +} \ No newline at end of file diff --git a/util/src/journaldb/refcounteddb.rs b/util/src/journaldb/refcounteddb.rs index a584d7b29..b29a45212 100644 --- a/util/src/journaldb/refcounteddb.rs +++ b/util/src/journaldb/refcounteddb.rs @@ -19,7 +19,7 @@ use common::*; use rlp::*; use hashdb::*; -use overlaydb::*; +use overlaydb::OverlayDB; use super::{DB_PREFIX_LEN, LATEST_ERA_KEY}; use super::traits::JournalDB; use kvdb::{Database, DBTransaction}; diff --git a/util/src/memorydb.rs b/util/src/memorydb.rs index 7a3169f7a..4376d173c 100644 --- a/util/src/memorydb.rs +++ b/util/src/memorydb.rs @@ -134,7 +134,7 @@ impl MemoryDB { if key == &SHA3_NULL_RLP { return Some(STATIC_NULL_RLP.clone()); } - self.data.get(key).map(|&(ref v, x)| (&v[..], x)) + self.data.get(key).map(|&(ref val, rc)| (&val[..], rc)) } /// Denote than an existing value has the given key. Used when a key gets removed without diff --git a/util/src/overlaydb.rs b/util/src/overlaydb.rs index 8511aa42a..7310ef536 100644 --- a/util/src/overlaydb.rs +++ b/util/src/overlaydb.rs @@ -314,4 +314,4 @@ fn playpen() { db.write(batch).unwrap(); } fs::remove_dir_all("/tmp/test").unwrap(); -} +} \ No newline at end of file