Merge branch 'master' of github.com:ethcore/parity into spec_load_errors
This commit is contained in:
@@ -19,5 +19,6 @@ extern crate ethcore_ipc_codegen;
|
||||
fn main() {
|
||||
ethcore_ipc_codegen::derive_binary("src/types/mod.rs.in").unwrap();
|
||||
ethcore_ipc_codegen::derive_ipc("src/client/traits.rs").unwrap();
|
||||
ethcore_ipc_codegen::derive_ipc("src/snapshot/snapshot_service_trait.rs").unwrap();
|
||||
ethcore_ipc_codegen::derive_ipc("src/client/chain_notify.rs").unwrap();
|
||||
}
|
||||
|
||||
@@ -37,7 +37,7 @@ const MIN_MEM_LIMIT: usize = 16384;
|
||||
const MIN_QUEUE_LIMIT: usize = 512;
|
||||
|
||||
/// Block queue configuration
|
||||
#[derive(Debug, PartialEq)]
|
||||
#[derive(Debug, PartialEq, Clone)]
|
||||
pub struct BlockQueueConfig {
|
||||
/// Maximum number of blocks to keep in unverified queue.
|
||||
/// When the limit is reached, is_full returns true.
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
//! Blockchain configuration.
|
||||
|
||||
/// Blockchain configuration.
|
||||
#[derive(Debug, PartialEq)]
|
||||
#[derive(Debug, PartialEq, Clone)]
|
||||
pub struct Config {
|
||||
/// Preferred cache size in bytes.
|
||||
pub pref_cache_size: usize,
|
||||
|
||||
@@ -20,7 +20,7 @@ use util::H256;
|
||||
/// Represents what has to be handled by actor listening to chain events
|
||||
#[derive(Ipc)]
|
||||
pub trait ChainNotify : Send + Sync {
|
||||
/// fires when chain has new blocks
|
||||
/// fires when chain has new blocks.
|
||||
fn new_blocks(&self,
|
||||
_imported: Vec<H256>,
|
||||
_invalid: Vec<H256>,
|
||||
|
||||
@@ -32,7 +32,7 @@ use util::kvdb::*;
|
||||
// other
|
||||
use io::*;
|
||||
use views::{BlockView, HeaderView, BodyView};
|
||||
use error::{ImportError, ExecutionError, CallError, BlockError, ImportResult};
|
||||
use error::{ImportError, ExecutionError, CallError, BlockError, ImportResult, Error as EthcoreError};
|
||||
use header::BlockNumber;
|
||||
use state::State;
|
||||
use spec::Spec;
|
||||
@@ -122,11 +122,13 @@ impl SleepState {
|
||||
/// Call `import_block()` to import a block asynchronously; `flush_queue()` flushes the queue.
|
||||
pub struct Client {
|
||||
mode: Mode,
|
||||
chain: Arc<BlockChain>,
|
||||
tracedb: Arc<TraceDB<BlockChain>>,
|
||||
chain: RwLock<Arc<BlockChain>>,
|
||||
tracedb: RwLock<TraceDB<BlockChain>>,
|
||||
engine: Arc<Engine>,
|
||||
db: Arc<Database>,
|
||||
state_db: Mutex<Box<JournalDB>>,
|
||||
config: ClientConfig,
|
||||
db: RwLock<Arc<Database>>,
|
||||
pruning: journaldb::Algorithm,
|
||||
state_db: RwLock<Box<JournalDB>>,
|
||||
block_queue: BlockQueue,
|
||||
report: RwLock<ClientReport>,
|
||||
import_lock: Mutex<()>,
|
||||
@@ -168,8 +170,8 @@ impl Client {
|
||||
db_config.wal = config.db_wal;
|
||||
|
||||
let db = Arc::new(try!(Database::open(&db_config, &path.to_str().unwrap()).map_err(ClientError::Database)));
|
||||
let chain = Arc::new(BlockChain::new(config.blockchain, &gb, db.clone()));
|
||||
let tracedb = Arc::new(try!(TraceDB::new(config.tracing, db.clone(), chain.clone())));
|
||||
let chain = Arc::new(BlockChain::new(config.blockchain.clone(), &gb, db.clone()));
|
||||
let tracedb = RwLock::new(try!(TraceDB::new(config.tracing.clone(), db.clone(), chain.clone())));
|
||||
|
||||
let mut state_db = journaldb::new(db.clone(), config.pruning, ::db::COL_STATE);
|
||||
if state_db.is_empty() && try!(spec.ensure_db_good(state_db.as_hashdb_mut())) {
|
||||
@@ -184,32 +186,34 @@ impl Client {
|
||||
|
||||
let engine = spec.engine.clone();
|
||||
|
||||
let block_queue = BlockQueue::new(config.queue, engine.clone(), message_channel.clone());
|
||||
let block_queue = BlockQueue::new(config.queue.clone(), engine.clone(), message_channel.clone());
|
||||
let panic_handler = PanicHandler::new_in_arc();
|
||||
panic_handler.forward_from(&block_queue);
|
||||
|
||||
let awake = match config.mode { Mode::Dark(..) => false, _ => true };
|
||||
|
||||
let factories = Factories {
|
||||
vm: EvmFactory::new(config.vm_type),
|
||||
trie: TrieFactory::new(config.trie_spec),
|
||||
vm: EvmFactory::new(config.vm_type.clone()),
|
||||
trie: TrieFactory::new(config.trie_spec.clone()),
|
||||
accountdb: Default::default(),
|
||||
};
|
||||
|
||||
let client = Client {
|
||||
sleep_state: Mutex::new(SleepState::new(awake)),
|
||||
liveness: AtomicBool::new(awake),
|
||||
mode: config.mode,
|
||||
chain: chain,
|
||||
mode: config.mode.clone(),
|
||||
chain: RwLock::new(chain),
|
||||
tracedb: tracedb,
|
||||
engine: engine,
|
||||
db: db,
|
||||
state_db: Mutex::new(state_db),
|
||||
pruning: config.pruning.clone(),
|
||||
verifier: verification::new(config.verifier_type.clone()),
|
||||
config: config,
|
||||
db: RwLock::new(db),
|
||||
state_db: RwLock::new(state_db),
|
||||
block_queue: block_queue,
|
||||
report: RwLock::new(Default::default()),
|
||||
import_lock: Mutex::new(()),
|
||||
panic_handler: panic_handler,
|
||||
verifier: verification::new(config.verifier_type),
|
||||
miner: miner,
|
||||
io_channel: message_channel,
|
||||
notify: RwLock::new(Vec::new()),
|
||||
@@ -253,8 +257,9 @@ impl Client {
|
||||
let mut last_hashes = LastHashes::new();
|
||||
last_hashes.resize(256, H256::default());
|
||||
last_hashes[0] = parent_hash;
|
||||
let chain = self.chain.read();
|
||||
for i in 0..255 {
|
||||
match self.chain.block_details(&last_hashes[i]) {
|
||||
match chain.block_details(&last_hashes[i]) {
|
||||
Some(details) => {
|
||||
last_hashes[i + 1] = details.parent.clone();
|
||||
},
|
||||
@@ -270,22 +275,23 @@ impl Client {
|
||||
let engine = &*self.engine;
|
||||
let header = &block.header;
|
||||
|
||||
let chain = self.chain.read();
|
||||
// Check the block isn't so old we won't be able to enact it.
|
||||
let best_block_number = self.chain.best_block_number();
|
||||
let best_block_number = chain.best_block_number();
|
||||
if best_block_number >= HISTORY && header.number() <= best_block_number - HISTORY {
|
||||
warn!(target: "client", "Block import failed for #{} ({})\nBlock is ancient (current best block: #{}).", header.number(), header.hash(), best_block_number);
|
||||
return Err(());
|
||||
}
|
||||
|
||||
// Verify Block Family
|
||||
let verify_family_result = self.verifier.verify_block_family(header, &block.bytes, engine, &*self.chain);
|
||||
let verify_family_result = self.verifier.verify_block_family(header, &block.bytes, engine, &**chain);
|
||||
if let Err(e) = verify_family_result {
|
||||
warn!(target: "client", "Stage 3 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
|
||||
return Err(());
|
||||
};
|
||||
|
||||
// Check if Parent is in chain
|
||||
let chain_has_parent = self.chain.block_header(header.parent_hash());
|
||||
let chain_has_parent = chain.block_header(header.parent_hash());
|
||||
if let None = chain_has_parent {
|
||||
warn!(target: "client", "Block import failed for #{} ({}): Parent not found ({}) ", header.number(), header.hash(), header.parent_hash());
|
||||
return Err(());
|
||||
@@ -294,9 +300,9 @@ impl Client {
|
||||
// Enact Verified Block
|
||||
let parent = chain_has_parent.unwrap();
|
||||
let last_hashes = self.build_last_hashes(header.parent_hash().clone());
|
||||
let db = self.state_db.lock().boxed_clone();
|
||||
let db = self.state_db.read().boxed_clone();
|
||||
|
||||
let enact_result = enact_verified(block, engine, self.tracedb.tracing_enabled(), db, &parent, last_hashes, self.factories.clone());
|
||||
let enact_result = enact_verified(block, engine, self.tracedb.read().tracing_enabled(), db, &parent, last_hashes, self.factories.clone());
|
||||
if let Err(e) = enact_result {
|
||||
warn!(target: "client", "Block import failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
|
||||
return Err(());
|
||||
@@ -408,17 +414,18 @@ impl Client {
|
||||
}
|
||||
}
|
||||
|
||||
self.db.flush().expect("DB flush failed.");
|
||||
self.db.read().flush().expect("DB flush failed.");
|
||||
imported
|
||||
}
|
||||
|
||||
fn commit_block<B>(&self, block: B, hash: &H256, block_data: &[u8]) -> ImportRoute where B: IsBlock + Drain {
|
||||
let number = block.header().number();
|
||||
let parent = block.header().parent_hash().clone();
|
||||
let chain = self.chain.read();
|
||||
// Are we committing an era?
|
||||
let ancient = if number >= HISTORY {
|
||||
let n = number - HISTORY;
|
||||
Some((n, self.chain.block_hash(n).unwrap()))
|
||||
Some((n, chain.block_hash(n).unwrap()))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
@@ -432,14 +439,14 @@ impl Client {
|
||||
|
||||
//let traces = From::from(block.traces().clone().unwrap_or_else(Vec::new));
|
||||
|
||||
let mut batch = DBTransaction::new(&self.db);
|
||||
let mut batch = DBTransaction::new(&self.db.read());
|
||||
// CHECK! I *think* this is fine, even if the state_root is equal to another
|
||||
// already-imported block of the same number.
|
||||
// TODO: Prove it with a test.
|
||||
block.drain().commit(&mut batch, number, hash, ancient).expect("DB commit failed.");
|
||||
|
||||
let route = self.chain.insert_block(&mut batch, block_data, receipts);
|
||||
self.tracedb.import(&mut batch, TraceImportRequest {
|
||||
let route = chain.insert_block(&mut batch, block_data, receipts);
|
||||
self.tracedb.read().import(&mut batch, TraceImportRequest {
|
||||
traces: traces.into(),
|
||||
block_hash: hash.clone(),
|
||||
block_number: number,
|
||||
@@ -447,8 +454,8 @@ impl Client {
|
||||
retracted: route.retracted.len()
|
||||
});
|
||||
// Final commit to the DB
|
||||
self.db.write_buffered(batch);
|
||||
self.chain.commit();
|
||||
self.db.read().write_buffered(batch);
|
||||
chain.commit();
|
||||
|
||||
self.update_last_hashes(&parent, hash);
|
||||
route
|
||||
@@ -491,10 +498,10 @@ impl Client {
|
||||
};
|
||||
|
||||
self.block_header(id).and_then(|header| {
|
||||
let db = self.state_db.lock().boxed_clone();
|
||||
let db = self.state_db.read().boxed_clone();
|
||||
|
||||
// early exit for pruned blocks
|
||||
if db.is_pruned() && self.chain.best_block_number() >= block_number + HISTORY {
|
||||
if db.is_pruned() && self.chain.read().best_block_number() >= block_number + HISTORY {
|
||||
return None;
|
||||
}
|
||||
|
||||
@@ -522,7 +529,7 @@ impl Client {
|
||||
/// Get a copy of the best block's state.
|
||||
pub fn state(&self) -> State {
|
||||
State::from_existing(
|
||||
self.state_db.lock().boxed_clone(),
|
||||
self.state_db.read().boxed_clone(),
|
||||
HeaderView::new(&self.best_block_header()).state_root(),
|
||||
self.engine.account_start_nonce(),
|
||||
self.factories.clone())
|
||||
@@ -531,22 +538,22 @@ impl Client {
|
||||
|
||||
/// Get info on the cache.
|
||||
pub fn blockchain_cache_info(&self) -> BlockChainCacheSize {
|
||||
self.chain.cache_size()
|
||||
self.chain.read().cache_size()
|
||||
}
|
||||
|
||||
/// Get the report.
|
||||
pub fn report(&self) -> ClientReport {
|
||||
let mut report = self.report.read().clone();
|
||||
report.state_db_mem = self.state_db.lock().mem_used();
|
||||
report.state_db_mem = self.state_db.read().mem_used();
|
||||
report
|
||||
}
|
||||
|
||||
/// Tick the client.
|
||||
// TODO: manage by real events.
|
||||
pub fn tick(&self) {
|
||||
self.chain.collect_garbage();
|
||||
self.chain.read().collect_garbage();
|
||||
self.block_queue.collect_garbage();
|
||||
self.tracedb.collect_garbage();
|
||||
self.tracedb.read().collect_garbage();
|
||||
|
||||
match self.mode {
|
||||
Mode::Dark(timeout) => {
|
||||
@@ -584,16 +591,16 @@ impl Client {
|
||||
pub fn block_number(&self, id: BlockID) -> Option<BlockNumber> {
|
||||
match id {
|
||||
BlockID::Number(number) => Some(number),
|
||||
BlockID::Hash(ref hash) => self.chain.block_number(hash),
|
||||
BlockID::Hash(ref hash) => self.chain.read().block_number(hash),
|
||||
BlockID::Earliest => Some(0),
|
||||
BlockID::Latest | BlockID::Pending => Some(self.chain.best_block_number()),
|
||||
BlockID::Latest | BlockID::Pending => Some(self.chain.read().best_block_number()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Take a snapshot at the given block.
|
||||
/// If the ID given is "latest", this will default to 1000 blocks behind.
|
||||
pub fn take_snapshot<W: snapshot_io::SnapshotWriter + Send>(&self, writer: W, at: BlockID, p: &snapshot::Progress) -> Result<(), ::error::Error> {
|
||||
let db = self.state_db.lock().boxed_clone();
|
||||
pub fn take_snapshot<W: snapshot_io::SnapshotWriter + Send>(&self, writer: W, at: BlockID, p: &snapshot::Progress) -> Result<(), EthcoreError> {
|
||||
let db = self.state_db.read().boxed_clone();
|
||||
let best_block_number = self.chain_info().best_block_number;
|
||||
let block_number = try!(self.block_number(at).ok_or(snapshot::Error::InvalidStartingBlock(at)));
|
||||
|
||||
@@ -618,7 +625,7 @@ impl Client {
|
||||
},
|
||||
};
|
||||
|
||||
try!(snapshot::take_snapshot(&self.chain, start_hash, db.as_hashdb(), writer, p));
|
||||
try!(snapshot::take_snapshot(&self.chain.read(), start_hash, db.as_hashdb(), writer, p));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -634,8 +641,8 @@ impl Client {
|
||||
|
||||
fn transaction_address(&self, id: TransactionID) -> Option<TransactionAddress> {
|
||||
match id {
|
||||
TransactionID::Hash(ref hash) => self.chain.transaction_address(hash),
|
||||
TransactionID::Location(id, index) => Self::block_hash(&self.chain, id).map(|hash| TransactionAddress {
|
||||
TransactionID::Hash(ref hash) => self.chain.read().transaction_address(hash),
|
||||
TransactionID::Location(id, index) => Self::block_hash(&self.chain.read(), id).map(|hash| TransactionAddress {
|
||||
block_hash: hash,
|
||||
index: index,
|
||||
})
|
||||
@@ -666,6 +673,25 @@ impl Client {
|
||||
}
|
||||
}
|
||||
|
||||
impl snapshot::DatabaseRestore for Client {
|
||||
/// Restart the client with a new backend
|
||||
fn restore_db(&self, new_db: &str) -> Result<(), EthcoreError> {
|
||||
let _import_lock = self.import_lock.lock();
|
||||
let mut state_db = self.state_db.write();
|
||||
let mut chain = self.chain.write();
|
||||
let mut tracedb = self.tracedb.write();
|
||||
self.miner.clear();
|
||||
let db = self.db.write();
|
||||
try!(db.restore(new_db));
|
||||
|
||||
*state_db = journaldb::new(db.clone(), self.pruning, ::db::COL_STATE);
|
||||
*chain = Arc::new(BlockChain::new(self.config.blockchain.clone(), &[], db.clone()));
|
||||
*tracedb = try!(TraceDB::new(self.config.tracing.clone(), db.clone(), chain.clone()).map_err(ClientError::from));
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
impl BlockChainClient for Client {
|
||||
fn call(&self, t: &SignedTransaction, block: BlockID, analytics: CallAnalytics) -> Result<Executed, CallError> {
|
||||
let header = try!(self.block_header(block).ok_or(CallError::StatePruned));
|
||||
@@ -749,15 +775,17 @@ impl BlockChainClient for Client {
|
||||
}
|
||||
|
||||
fn best_block_header(&self) -> Bytes {
|
||||
self.chain.best_block_header()
|
||||
self.chain.read().best_block_header()
|
||||
}
|
||||
|
||||
fn block_header(&self, id: BlockID) -> Option<Bytes> {
|
||||
Self::block_hash(&self.chain, id).and_then(|hash| self.chain.block_header_data(&hash))
|
||||
let chain = self.chain.read();
|
||||
Self::block_hash(&chain, id).and_then(|hash| chain.block_header_data(&hash))
|
||||
}
|
||||
|
||||
fn block_body(&self, id: BlockID) -> Option<Bytes> {
|
||||
Self::block_hash(&self.chain, id).and_then(|hash| self.chain.block_body(&hash))
|
||||
let chain = self.chain.read();
|
||||
Self::block_hash(&chain, id).and_then(|hash| chain.block_body(&hash))
|
||||
}
|
||||
|
||||
fn block(&self, id: BlockID) -> Option<Bytes> {
|
||||
@@ -766,14 +794,16 @@ impl BlockChainClient for Client {
|
||||
return Some(block.rlp_bytes(Seal::Without));
|
||||
}
|
||||
}
|
||||
Self::block_hash(&self.chain, id).and_then(|hash| {
|
||||
self.chain.block(&hash)
|
||||
let chain = self.chain.read();
|
||||
Self::block_hash(&chain, id).and_then(|hash| {
|
||||
chain.block(&hash)
|
||||
})
|
||||
}
|
||||
|
||||
fn block_status(&self, id: BlockID) -> BlockStatus {
|
||||
match Self::block_hash(&self.chain, id) {
|
||||
Some(ref hash) if self.chain.is_known(hash) => BlockStatus::InChain,
|
||||
let chain = self.chain.read();
|
||||
match Self::block_hash(&chain, id) {
|
||||
Some(ref hash) if chain.is_known(hash) => BlockStatus::InChain,
|
||||
Some(hash) => self.block_queue.block_status(&hash),
|
||||
None => BlockStatus::Unknown
|
||||
}
|
||||
@@ -785,7 +815,8 @@ impl BlockChainClient for Client {
|
||||
return Some(*block.header.difficulty() + self.block_total_difficulty(BlockID::Latest).expect("blocks in chain have details; qed"));
|
||||
}
|
||||
}
|
||||
Self::block_hash(&self.chain, id).and_then(|hash| self.chain.block_details(&hash)).map(|d| d.total_difficulty)
|
||||
let chain = self.chain.read();
|
||||
Self::block_hash(&chain, id).and_then(|hash| chain.block_details(&hash)).map(|d| d.total_difficulty)
|
||||
}
|
||||
|
||||
fn nonce(&self, address: &Address, id: BlockID) -> Option<U256> {
|
||||
@@ -793,7 +824,8 @@ impl BlockChainClient for Client {
|
||||
}
|
||||
|
||||
fn block_hash(&self, id: BlockID) -> Option<H256> {
|
||||
Self::block_hash(&self.chain, id)
|
||||
let chain = self.chain.read();
|
||||
Self::block_hash(&chain, id)
|
||||
}
|
||||
|
||||
fn code(&self, address: &Address, id: BlockID) -> Option<Option<Bytes>> {
|
||||
@@ -809,7 +841,7 @@ impl BlockChainClient for Client {
|
||||
}
|
||||
|
||||
fn transaction(&self, id: TransactionID) -> Option<LocalizedTransaction> {
|
||||
self.transaction_address(id).and_then(|address| self.chain.transaction(&address))
|
||||
self.transaction_address(id).and_then(|address| self.chain.read().transaction(&address))
|
||||
}
|
||||
|
||||
fn uncle(&self, id: UncleID) -> Option<Bytes> {
|
||||
@@ -818,11 +850,12 @@ impl BlockChainClient for Client {
|
||||
}
|
||||
|
||||
fn transaction_receipt(&self, id: TransactionID) -> Option<LocalizedReceipt> {
|
||||
self.transaction_address(id).and_then(|address| self.chain.block_number(&address.block_hash).and_then(|block_number| {
|
||||
let t = self.chain.block_body(&address.block_hash)
|
||||
let chain = self.chain.read();
|
||||
self.transaction_address(id).and_then(|address| chain.block_number(&address.block_hash).and_then(|block_number| {
|
||||
let t = chain.block_body(&address.block_hash)
|
||||
.and_then(|block| BodyView::new(&block).localized_transaction_at(&address.block_hash, block_number, address.index));
|
||||
|
||||
match (t, self.chain.transaction_receipt(&address)) {
|
||||
match (t, chain.transaction_receipt(&address)) {
|
||||
(Some(tx), Some(receipt)) => {
|
||||
let block_hash = tx.block_hash.clone();
|
||||
let block_number = tx.block_number.clone();
|
||||
@@ -832,7 +865,7 @@ impl BlockChainClient for Client {
|
||||
0 => U256::zero(),
|
||||
i => {
|
||||
let prior_address = TransactionAddress { block_hash: address.block_hash, index: i - 1 };
|
||||
let prior_receipt = self.chain.transaction_receipt(&prior_address).expect("Transaction receipt at `address` exists; `prior_address` has lower index in same block; qed");
|
||||
let prior_receipt = chain.transaction_receipt(&prior_address).expect("Transaction receipt at `address` exists; `prior_address` has lower index in same block; qed");
|
||||
prior_receipt.gas_used
|
||||
}
|
||||
};
|
||||
@@ -863,28 +896,29 @@ impl BlockChainClient for Client {
|
||||
}
|
||||
|
||||
fn tree_route(&self, from: &H256, to: &H256) -> Option<TreeRoute> {
|
||||
match self.chain.is_known(from) && self.chain.is_known(to) {
|
||||
true => Some(self.chain.tree_route(from.clone(), to.clone())),
|
||||
let chain = self.chain.read();
|
||||
match chain.is_known(from) && chain.is_known(to) {
|
||||
true => Some(chain.tree_route(from.clone(), to.clone())),
|
||||
false => None
|
||||
}
|
||||
}
|
||||
|
||||
fn find_uncles(&self, hash: &H256) -> Option<Vec<H256>> {
|
||||
self.chain.find_uncle_hashes(hash, self.engine.maximum_uncle_age())
|
||||
self.chain.read().find_uncle_hashes(hash, self.engine.maximum_uncle_age())
|
||||
}
|
||||
|
||||
fn state_data(&self, hash: &H256) -> Option<Bytes> {
|
||||
self.state_db.lock().state(hash)
|
||||
self.state_db.read().state(hash)
|
||||
}
|
||||
|
||||
fn block_receipts(&self, hash: &H256) -> Option<Bytes> {
|
||||
self.chain.block_receipts(hash).map(|receipts| ::rlp::encode(&receipts).to_vec())
|
||||
self.chain.read().block_receipts(hash).map(|receipts| ::rlp::encode(&receipts).to_vec())
|
||||
}
|
||||
|
||||
fn import_block(&self, bytes: Bytes) -> Result<H256, BlockImportError> {
|
||||
{
|
||||
let header = BlockView::new(&bytes).header_view();
|
||||
if self.chain.is_known(&header.sha3()) {
|
||||
if self.chain.read().is_known(&header.sha3()) {
|
||||
return Err(BlockImportError::Import(ImportError::AlreadyInChain));
|
||||
}
|
||||
if self.block_status(BlockID::Hash(header.parent_hash())) == BlockStatus::Unknown {
|
||||
@@ -903,12 +937,13 @@ impl BlockChainClient for Client {
|
||||
}
|
||||
|
||||
fn chain_info(&self) -> BlockChainInfo {
|
||||
let chain = self.chain.read();
|
||||
BlockChainInfo {
|
||||
total_difficulty: self.chain.best_block_total_difficulty(),
|
||||
pending_total_difficulty: self.chain.best_block_total_difficulty(),
|
||||
genesis_hash: self.chain.genesis_hash(),
|
||||
best_block_hash: self.chain.best_block_hash(),
|
||||
best_block_number: From::from(self.chain.best_block_number())
|
||||
total_difficulty: chain.best_block_total_difficulty(),
|
||||
pending_total_difficulty: chain.best_block_total_difficulty(),
|
||||
genesis_hash: chain.genesis_hash(),
|
||||
best_block_hash: chain.best_block_hash(),
|
||||
best_block_number: From::from(chain.best_block_number())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -918,7 +953,7 @@ impl BlockChainClient for Client {
|
||||
|
||||
fn blocks_with_bloom(&self, bloom: &H2048, from_block: BlockID, to_block: BlockID) -> Option<Vec<BlockNumber>> {
|
||||
match (self.block_number(from_block), self.block_number(to_block)) {
|
||||
(Some(from), Some(to)) => Some(self.chain.blocks_with_bloom(bloom, from, to)),
|
||||
(Some(from), Some(to)) => Some(self.chain.read().blocks_with_bloom(bloom, from, to)),
|
||||
_ => None
|
||||
}
|
||||
}
|
||||
@@ -936,10 +971,11 @@ impl BlockChainClient for Client {
|
||||
|
||||
blocks.sort();
|
||||
|
||||
let chain = self.chain.read();
|
||||
blocks.into_iter()
|
||||
.filter_map(|number| self.chain.block_hash(number).map(|hash| (number, hash)))
|
||||
.filter_map(|(number, hash)| self.chain.block_receipts(&hash).map(|r| (number, hash, r.receipts)))
|
||||
.filter_map(|(number, hash, receipts)| self.chain.block_body(&hash).map(|ref b| (number, hash, receipts, BodyView::new(b).transaction_hashes())))
|
||||
.filter_map(|number| chain.block_hash(number).map(|hash| (number, hash)))
|
||||
.filter_map(|(number, hash)| chain.block_receipts(&hash).map(|r| (number, hash, r.receipts)))
|
||||
.filter_map(|(number, hash, receipts)| chain.block_body(&hash).map(|ref b| (number, hash, receipts, BodyView::new(b).transaction_hashes())))
|
||||
.flat_map(|(number, hash, receipts, hashes)| {
|
||||
let mut log_index = 0;
|
||||
receipts.into_iter()
|
||||
@@ -975,7 +1011,7 @@ impl BlockChainClient for Client {
|
||||
to_address: From::from(filter.to_address),
|
||||
};
|
||||
|
||||
let traces = self.tracedb.filter(&filter);
|
||||
let traces = self.tracedb.read().filter(&filter);
|
||||
Some(traces)
|
||||
} else {
|
||||
None
|
||||
@@ -987,7 +1023,7 @@ impl BlockChainClient for Client {
|
||||
self.transaction_address(trace.transaction)
|
||||
.and_then(|tx_address| {
|
||||
self.block_number(BlockID::Hash(tx_address.block_hash))
|
||||
.and_then(|number| self.tracedb.trace(number, tx_address.index, trace_address))
|
||||
.and_then(|number| self.tracedb.read().trace(number, tx_address.index, trace_address))
|
||||
})
|
||||
}
|
||||
|
||||
@@ -995,17 +1031,17 @@ impl BlockChainClient for Client {
|
||||
self.transaction_address(transaction)
|
||||
.and_then(|tx_address| {
|
||||
self.block_number(BlockID::Hash(tx_address.block_hash))
|
||||
.and_then(|number| self.tracedb.transaction_traces(number, tx_address.index))
|
||||
.and_then(|number| self.tracedb.read().transaction_traces(number, tx_address.index))
|
||||
})
|
||||
}
|
||||
|
||||
fn block_traces(&self, block: BlockID) -> Option<Vec<LocalizedTrace>> {
|
||||
self.block_number(block)
|
||||
.and_then(|number| self.tracedb.block_traces(number))
|
||||
.and_then(|number| self.tracedb.read().block_traces(number))
|
||||
}
|
||||
|
||||
fn last_hashes(&self) -> LastHashes {
|
||||
(*self.build_last_hashes(self.chain.best_block_hash())).clone()
|
||||
(*self.build_last_hashes(self.chain.read().best_block_hash())).clone()
|
||||
}
|
||||
|
||||
fn queue_transactions(&self, transactions: Vec<Bytes>) {
|
||||
@@ -1032,14 +1068,15 @@ impl BlockChainClient for Client {
|
||||
impl MiningBlockChainClient for Client {
|
||||
fn prepare_open_block(&self, author: Address, gas_range_target: (U256, U256), extra_data: Bytes) -> OpenBlock {
|
||||
let engine = &*self.engine;
|
||||
let h = self.chain.best_block_hash();
|
||||
let chain = self.chain.read();
|
||||
let h = chain.best_block_hash();
|
||||
|
||||
let mut open_block = OpenBlock::new(
|
||||
engine,
|
||||
self.factories.clone(),
|
||||
false, // TODO: this will need to be parameterised once we want to do immediate mining insertion.
|
||||
self.state_db.lock().boxed_clone(),
|
||||
&self.chain.block_header(&h).expect("h is best block hash: so its header must exist: qed"),
|
||||
self.state_db.read().boxed_clone(),
|
||||
&chain.block_header(&h).expect("h is best block hash: so its header must exist: qed"),
|
||||
self.build_last_hashes(h.clone()),
|
||||
author,
|
||||
gas_range_target,
|
||||
@@ -1047,7 +1084,7 @@ impl MiningBlockChainClient for Client {
|
||||
).expect("OpenBlock::new only fails if parent state root invalid; state root of best block's header is never invalid; qed");
|
||||
|
||||
// Add uncles
|
||||
self.chain
|
||||
chain
|
||||
.find_uncle_headers(&h, engine.maximum_uncle_age())
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
@@ -1088,7 +1125,7 @@ impl MiningBlockChainClient for Client {
|
||||
precise_time_ns() - start,
|
||||
);
|
||||
});
|
||||
self.db.flush().expect("DB flush failed.");
|
||||
self.db.read().flush().expect("DB flush failed.");
|
||||
Ok(h)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -62,7 +62,7 @@ impl FromStr for DatabaseCompactionProfile {
|
||||
}
|
||||
|
||||
/// Operating mode for the client.
|
||||
#[derive(Debug, Eq, PartialEq)]
|
||||
#[derive(Debug, Eq, PartialEq, Clone)]
|
||||
pub enum Mode {
|
||||
/// Always on.
|
||||
Active,
|
||||
|
||||
@@ -227,6 +227,11 @@ impl Miner {
|
||||
self.options.force_sealing || !self.options.new_work_notify.is_empty()
|
||||
}
|
||||
|
||||
/// Clear all pending block states
|
||||
pub fn clear(&self) {
|
||||
self.sealing_work.lock().queue.reset();
|
||||
}
|
||||
|
||||
/// Get `Some` `clone()` of the current pending block's state or `None` if we're not sealing.
|
||||
pub fn pending_state(&self) -> Option<State> {
|
||||
self.sealing_work.lock().queue.peek_last_ref().map(|b| b.block().fields().state.clone())
|
||||
|
||||
@@ -46,6 +46,8 @@ pub enum ClientIoMessage {
|
||||
FeedStateChunk(H256, Bytes),
|
||||
/// Feed a block chunk to the snapshot service
|
||||
FeedBlockChunk(H256, Bytes),
|
||||
/// Take a snapshot for the block with given number.
|
||||
TakeSnapshot(u64),
|
||||
}
|
||||
|
||||
/// Client service setup. Creates and registers client and network services with the IO subsystem.
|
||||
@@ -78,7 +80,7 @@ impl ClientService {
|
||||
|
||||
let pruning = config.pruning;
|
||||
let client = try!(Client::new(config, &spec, db_path, miner, io_service.channel()));
|
||||
let snapshot = try!(SnapshotService::new(spec, pruning, db_path.into(), io_service.channel()));
|
||||
let snapshot = try!(SnapshotService::new(spec, pruning, db_path.into(), io_service.channel(), client.clone()));
|
||||
|
||||
let snapshot = Arc::new(snapshot);
|
||||
|
||||
@@ -90,7 +92,7 @@ impl ClientService {
|
||||
try!(io_service.register_handler(client_io));
|
||||
|
||||
let stop_guard = ::devtools::StopGuard::new();
|
||||
run_ipc(ipc_path, client.clone(), stop_guard.share());
|
||||
run_ipc(ipc_path, client.clone(), snapshot.clone(), stop_guard.share());
|
||||
|
||||
Ok(ClientService {
|
||||
io_service: Arc::new(io_service),
|
||||
@@ -145,16 +147,22 @@ struct ClientIoHandler {
|
||||
}
|
||||
|
||||
const CLIENT_TICK_TIMER: TimerToken = 0;
|
||||
const SNAPSHOT_TICK_TIMER: TimerToken = 1;
|
||||
|
||||
const CLIENT_TICK_MS: u64 = 5000;
|
||||
const SNAPSHOT_TICK_MS: u64 = 10000;
|
||||
|
||||
impl IoHandler<ClientIoMessage> for ClientIoHandler {
|
||||
fn initialize(&self, io: &IoContext<ClientIoMessage>) {
|
||||
io.register_timer(CLIENT_TICK_TIMER, CLIENT_TICK_MS).expect("Error registering client timer");
|
||||
io.register_timer(SNAPSHOT_TICK_TIMER, SNAPSHOT_TICK_MS).expect("Error registering snapshot timer");
|
||||
}
|
||||
|
||||
fn timeout(&self, _io: &IoContext<ClientIoMessage>, timer: TimerToken) {
|
||||
if timer == CLIENT_TICK_TIMER {
|
||||
self.client.tick();
|
||||
match timer {
|
||||
CLIENT_TICK_TIMER => self.client.tick(),
|
||||
SNAPSHOT_TICK_TIMER => self.snapshot.tick(),
|
||||
_ => warn!("IO service triggered unregistered timer '{}'", timer),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -170,20 +178,38 @@ impl IoHandler<ClientIoMessage> for ClientIoHandler {
|
||||
}
|
||||
ClientIoMessage::FeedStateChunk(ref hash, ref chunk) => self.snapshot.feed_state_chunk(*hash, chunk),
|
||||
ClientIoMessage::FeedBlockChunk(ref hash, ref chunk) => self.snapshot.feed_block_chunk(*hash, chunk),
|
||||
ClientIoMessage::TakeSnapshot(num) => {
|
||||
if let Err(e) = self.snapshot.take_snapshot(&*self.client, num) {
|
||||
warn!("Failed to take snapshot at block #{}: {}", num, e);
|
||||
}
|
||||
}
|
||||
_ => {} // ignore other messages
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature="ipc")]
|
||||
fn run_ipc(base_path: &Path, client: Arc<Client>, stop: Arc<AtomicBool>) {
|
||||
fn run_ipc(base_path: &Path, client: Arc<Client>, snapshot_service: Arc<SnapshotService>, stop: Arc<AtomicBool>) {
|
||||
let mut path = base_path.to_owned();
|
||||
path.push("parity-chain.ipc");
|
||||
let socket_addr = format!("ipc://{}", path.to_string_lossy());
|
||||
let s = stop.clone();
|
||||
::std::thread::spawn(move || {
|
||||
let mut worker = nanoipc::Worker::new(&(client as Arc<BlockChainClient>));
|
||||
worker.add_reqrep(&socket_addr).expect("Ipc expected to initialize with no issues");
|
||||
|
||||
while !s.load(::std::sync::atomic::Ordering::Relaxed) {
|
||||
worker.poll();
|
||||
}
|
||||
});
|
||||
|
||||
let mut path = base_path.to_owned();
|
||||
path.push("parity-snapshot.ipc");
|
||||
let socket_addr = format!("ipc://{}", path.to_string_lossy());
|
||||
::std::thread::spawn(move || {
|
||||
let mut worker = nanoipc::Worker::new(&(snapshot_service as Arc<::snapshot::SnapshotService>));
|
||||
worker.add_reqrep(&socket_addr).expect("Ipc expected to initialize with no issues");
|
||||
|
||||
while !stop.load(::std::sync::atomic::Ordering::Relaxed) {
|
||||
worker.poll();
|
||||
}
|
||||
@@ -191,7 +217,7 @@ fn run_ipc(base_path: &Path, client: Arc<Client>, stop: Arc<AtomicBool>) {
|
||||
}
|
||||
|
||||
#[cfg(not(feature="ipc"))]
|
||||
fn run_ipc(_base_path: &Path, _client: Arc<Client>, _stop: Arc<AtomicBool>) {
|
||||
fn run_ipc(_base_path: &Path, _client: Arc<Client>, _snapshot_service: Arc<SnapshotService>, _stop: Arc<AtomicBool>) {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -32,9 +32,9 @@ use util::Mutex;
|
||||
use util::hash::{FixedHash, H256};
|
||||
use util::journaldb::{self, Algorithm, JournalDB};
|
||||
use util::kvdb::Database;
|
||||
use util::sha3::SHA3_NULL_RLP;
|
||||
use util::trie::{TrieDB, TrieDBMut, Trie, TrieMut};
|
||||
use rlp::{DecoderError, RlpStream, Stream, UntrustedRlp, View, Compressible, RlpType};
|
||||
use util::sha3::SHA3_NULL_RLP;
|
||||
use rlp::{RlpStream, Stream, UntrustedRlp, View, Compressible, RlpType};
|
||||
|
||||
use self::account::Account;
|
||||
use self::block::AbridgedBlock;
|
||||
@@ -44,7 +44,12 @@ use crossbeam::{scope, ScopedJoinHandle};
|
||||
use rand::{Rng, OsRng};
|
||||
|
||||
pub use self::error::Error;
|
||||
pub use self::service::{RestorationStatus, Service, SnapshotService};
|
||||
|
||||
pub use self::service::{Service, DatabaseRestore};
|
||||
pub use self::traits::{SnapshotService, RemoteSnapshotService};
|
||||
pub use self::watcher::Watcher;
|
||||
pub use types::snapshot_manifest::ManifestData;
|
||||
pub use types::restoration_status::RestorationStatus;
|
||||
|
||||
pub mod io;
|
||||
pub mod service;
|
||||
@@ -52,10 +57,16 @@ pub mod service;
|
||||
mod account;
|
||||
mod block;
|
||||
mod error;
|
||||
mod watcher;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
mod traits {
|
||||
#![allow(dead_code, unused_assignments, unused_variables, missing_docs)] // codegen issues
|
||||
include!(concat!(env!("OUT_DIR"), "/snapshot_service_trait.rs"));
|
||||
}
|
||||
|
||||
// Try to have chunks be around 4MB (before compression)
|
||||
const PREFERRED_CHUNK_SIZE: usize = 4 * 1024 * 1024;
|
||||
|
||||
@@ -72,17 +83,28 @@ pub struct Progress {
|
||||
}
|
||||
|
||||
impl Progress {
|
||||
/// Reset the progress.
|
||||
pub fn reset(&self) {
|
||||
self.accounts.store(0, Ordering::Release);
|
||||
self.blocks.store(0, Ordering::Release);
|
||||
self.size.store(0, Ordering::Release);
|
||||
|
||||
// atomic fence here to ensure the others are written first?
|
||||
// logs might very rarely get polluted if not.
|
||||
self.done.store(false, Ordering::Release);
|
||||
}
|
||||
|
||||
/// Get the number of accounts snapshotted thus far.
|
||||
pub fn accounts(&self) -> usize { self.accounts.load(Ordering::Relaxed) }
|
||||
pub fn accounts(&self) -> usize { self.accounts.load(Ordering::Acquire) }
|
||||
|
||||
/// Get the number of blocks snapshotted thus far.
|
||||
pub fn blocks(&self) -> usize { self.blocks.load(Ordering::Relaxed) }
|
||||
pub fn blocks(&self) -> usize { self.blocks.load(Ordering::Acquire) }
|
||||
|
||||
/// Get the written size of the snapshot in bytes.
|
||||
pub fn size(&self) -> usize { self.size.load(Ordering::Relaxed) }
|
||||
pub fn size(&self) -> usize { self.size.load(Ordering::Acquire) }
|
||||
|
||||
/// Whether the snapshot is complete.
|
||||
pub fn done(&self) -> bool { self.done.load(Ordering::SeqCst) }
|
||||
pub fn done(&self) -> bool { self.done.load(Ordering::Acquire) }
|
||||
|
||||
}
|
||||
/// Take a snapshot using the given blockchain, starting block hash, and database, writing into the given writer.
|
||||
@@ -354,54 +376,6 @@ pub fn chunk_state<'a>(db: &HashDB, root: &H256, writer: &Mutex<SnapshotWriter +
|
||||
Ok(chunker.hashes)
|
||||
}
|
||||
|
||||
/// Manifest data.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct ManifestData {
|
||||
/// List of state chunk hashes.
|
||||
pub state_hashes: Vec<H256>,
|
||||
/// List of block chunk hashes.
|
||||
pub block_hashes: Vec<H256>,
|
||||
/// The final, expected state root.
|
||||
pub state_root: H256,
|
||||
/// Block number this snapshot was taken at.
|
||||
pub block_number: u64,
|
||||
/// Block hash this snapshot was taken at.
|
||||
pub block_hash: H256,
|
||||
}
|
||||
|
||||
impl ManifestData {
|
||||
/// Encode the manifest data to rlp.
|
||||
pub fn into_rlp(self) -> Bytes {
|
||||
let mut stream = RlpStream::new_list(5);
|
||||
stream.append(&self.state_hashes);
|
||||
stream.append(&self.block_hashes);
|
||||
stream.append(&self.state_root);
|
||||
stream.append(&self.block_number);
|
||||
stream.append(&self.block_hash);
|
||||
|
||||
stream.out()
|
||||
}
|
||||
|
||||
/// Try to restore manifest data from raw bytes, interpreted as RLP.
|
||||
pub fn from_rlp(raw: &[u8]) -> Result<Self, DecoderError> {
|
||||
let decoder = UntrustedRlp::new(raw);
|
||||
|
||||
let state_hashes: Vec<H256> = try!(decoder.val_at(0));
|
||||
let block_hashes: Vec<H256> = try!(decoder.val_at(1));
|
||||
let state_root: H256 = try!(decoder.val_at(2));
|
||||
let block_number: u64 = try!(decoder.val_at(3));
|
||||
let block_hash: H256 = try!(decoder.val_at(4));
|
||||
|
||||
Ok(ManifestData {
|
||||
state_hashes: state_hashes,
|
||||
block_hashes: block_hashes,
|
||||
state_root: state_root,
|
||||
block_number: block_number,
|
||||
block_hash: block_hash,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Used to rebuild the state trie piece by piece.
|
||||
pub struct StateRebuilder {
|
||||
db: Box<JournalDB>,
|
||||
@@ -653,4 +627,4 @@ impl BlockRebuilder {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,12 +23,14 @@ use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
use super::{ManifestData, StateRebuilder, BlockRebuilder};
|
||||
use super::{ManifestData, StateRebuilder, BlockRebuilder, RestorationStatus, SnapshotService};
|
||||
use super::io::{SnapshotReader, LooseReader, SnapshotWriter, LooseWriter};
|
||||
|
||||
use blockchain::BlockChain;
|
||||
use client::Client;
|
||||
use engines::Engine;
|
||||
use error::Error;
|
||||
use ids::BlockID;
|
||||
use service::ClientIoMessage;
|
||||
use spec::Spec;
|
||||
|
||||
@@ -39,51 +41,27 @@ use util::journaldb::Algorithm;
|
||||
use util::kvdb::{Database, DatabaseConfig};
|
||||
use util::snappy;
|
||||
|
||||
/// Statuses for restorations.
|
||||
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
|
||||
pub enum RestorationStatus {
|
||||
/// No restoration.
|
||||
Inactive,
|
||||
/// Ongoing restoration.
|
||||
Ongoing,
|
||||
/// Failed restoration.
|
||||
Failed,
|
||||
/// Helper for removing directories in case of error.
|
||||
struct Guard(bool, PathBuf);
|
||||
|
||||
impl Guard {
|
||||
fn new(path: PathBuf) -> Self { Guard(true, path) }
|
||||
|
||||
fn disarm(mut self) { self.0 = false }
|
||||
}
|
||||
|
||||
/// The interface for a snapshot network service.
|
||||
/// This handles:
|
||||
/// - restoration of snapshots to temporary databases.
|
||||
/// - responding to queries for snapshot manifests and chunks
|
||||
pub trait SnapshotService {
|
||||
/// Query the most recent manifest data.
|
||||
fn manifest(&self) -> Option<ManifestData>;
|
||||
impl Drop for Guard {
|
||||
fn drop(&mut self) {
|
||||
if self.0 {
|
||||
let _ = fs::remove_dir_all(&self.1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Get raw chunk for a given hash.
|
||||
fn chunk(&self, hash: H256) -> Option<Bytes>;
|
||||
|
||||
/// Ask the snapshot service for the restoration status.
|
||||
fn status(&self) -> RestorationStatus;
|
||||
|
||||
/// Ask the snapshot service for the number of chunks completed.
|
||||
/// Return a tuple of (state_chunks, block_chunks).
|
||||
/// Undefined when not restoring.
|
||||
fn chunks_done(&self) -> (usize, usize);
|
||||
|
||||
/// Begin snapshot restoration.
|
||||
/// If restoration in-progress, this will reset it.
|
||||
/// From this point on, any previous snapshot may become unavailable.
|
||||
fn begin_restore(&self, manifest: ManifestData);
|
||||
|
||||
/// Abort an in-progress restoration if there is one.
|
||||
fn abort_restore(&self);
|
||||
|
||||
/// Feed a raw state chunk to the service to be processed asynchronously.
|
||||
/// no-op if not currently restoring.
|
||||
fn restore_state_chunk(&self, hash: H256, chunk: Bytes);
|
||||
|
||||
/// Feed a raw block chunk to the service to be processed asynchronously.
|
||||
/// no-op if currently restoring.
|
||||
fn restore_block_chunk(&self, hash: H256, chunk: Bytes);
|
||||
/// External database restoration handler
|
||||
pub trait DatabaseRestore: Send + Sync {
|
||||
/// Restart with a new backend. Takes ownership of passed database and moves it to a new location.
|
||||
fn restore_db(&self, new_db: &str) -> Result<(), Error>;
|
||||
}
|
||||
|
||||
/// State restoration manager.
|
||||
@@ -96,6 +74,7 @@ struct Restoration {
|
||||
writer: LooseWriter,
|
||||
snappy_buffer: Bytes,
|
||||
final_state_root: H256,
|
||||
guard: Guard,
|
||||
}
|
||||
|
||||
struct RestorationParams<'a> {
|
||||
@@ -104,6 +83,7 @@ struct RestorationParams<'a> {
|
||||
db_path: PathBuf, // database path
|
||||
writer: LooseWriter, // writer for recovered snapshot.
|
||||
genesis: &'a [u8], // genesis block of the chain.
|
||||
guard: Guard, // guard for the restoration directory.
|
||||
}
|
||||
|
||||
impl Restoration {
|
||||
@@ -131,6 +111,7 @@ impl Restoration {
|
||||
writer: params.writer,
|
||||
snappy_buffer: Vec::new(),
|
||||
final_state_root: root,
|
||||
guard: params.guard,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -179,6 +160,7 @@ impl Restoration {
|
||||
|
||||
try!(self.writer.finish(self.manifest));
|
||||
|
||||
self.guard.disarm();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -208,11 +190,13 @@ pub struct Service {
|
||||
genesis_block: Bytes,
|
||||
state_chunks: AtomicUsize,
|
||||
block_chunks: AtomicUsize,
|
||||
db_restore: Arc<DatabaseRestore>,
|
||||
progress: super::Progress,
|
||||
}
|
||||
|
||||
impl Service {
|
||||
/// Create a new snapshot service.
|
||||
pub fn new(spec: &Spec, pruning: Algorithm, client_db: PathBuf, io_channel: Channel) -> Result<Self, Error> {
|
||||
pub fn new(spec: &Spec, pruning: Algorithm, client_db: PathBuf, io_channel: Channel, db_restore: Arc<DatabaseRestore>) -> Result<Self, Error> {
|
||||
let db_path = try!(client_db.parent().and_then(Path::parent)
|
||||
.ok_or_else(|| UtilError::SimpleString("Failed to find database root.".into()))).to_owned();
|
||||
|
||||
@@ -236,6 +220,8 @@ impl Service {
|
||||
genesis_block: spec.genesis_block(),
|
||||
state_chunks: AtomicUsize::new(0),
|
||||
block_chunks: AtomicUsize::new(0),
|
||||
db_restore: db_restore,
|
||||
progress: Default::default(),
|
||||
};
|
||||
|
||||
// create the root snapshot dir if it doesn't exist.
|
||||
@@ -252,6 +238,13 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
// delete the temporary snapshot dir if it does exist.
|
||||
if let Err(e) = fs::remove_dir_all(service.temp_snapshot_dir()) {
|
||||
if e.kind() != ErrorKind::NotFound {
|
||||
return Err(e.into())
|
||||
}
|
||||
}
|
||||
|
||||
Ok(service)
|
||||
}
|
||||
|
||||
@@ -269,6 +262,13 @@ impl Service {
|
||||
dir
|
||||
}
|
||||
|
||||
// get the temporary snapshot dir.
|
||||
fn temp_snapshot_dir(&self) -> PathBuf {
|
||||
let mut dir = self.root_dir();
|
||||
dir.push("in_progress");
|
||||
dir
|
||||
}
|
||||
|
||||
// get the restoration directory.
|
||||
fn restoration_dir(&self) -> PathBuf {
|
||||
let mut dir = self.root_dir();
|
||||
@@ -295,37 +295,50 @@ impl Service {
|
||||
let our_db = self.restoration_db();
|
||||
|
||||
trace!(target: "snapshot", "replacing {:?} with {:?}", self.client_db, our_db);
|
||||
try!(self.db_restore.restore_db(our_db.to_str().unwrap()));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
let mut backup_db = self.restoration_dir();
|
||||
backup_db.push("backup_db");
|
||||
/// Tick the snapshot service. This will log any active snapshot
|
||||
/// being taken.
|
||||
pub fn tick(&self) {
|
||||
if self.progress.done() { return }
|
||||
|
||||
let _ = fs::remove_dir_all(&backup_db);
|
||||
let p = &self.progress;
|
||||
info!("Snapshot: {} accounts {} blocks {} bytes", p.accounts(), p.blocks(), p.size());
|
||||
}
|
||||
|
||||
let existed = match fs::rename(&self.client_db, &backup_db) {
|
||||
Ok(_) => true,
|
||||
Err(e) => if let ErrorKind::NotFound = e.kind() {
|
||||
false
|
||||
} else {
|
||||
return Err(e.into());
|
||||
}
|
||||
};
|
||||
/// Take a snapshot at the block with the given number.
|
||||
/// calling this while a restoration is in progress or vice versa
|
||||
/// will lead to a race condition where the first one to finish will
|
||||
/// have their produced snapshot overwritten.
|
||||
pub fn take_snapshot(&self, client: &Client, num: u64) -> Result<(), Error> {
|
||||
info!("Taking snapshot at #{}", num);
|
||||
self.progress.reset();
|
||||
|
||||
match fs::rename(&our_db, &self.client_db) {
|
||||
Ok(_) => {
|
||||
// clean up the backup.
|
||||
if existed {
|
||||
try!(fs::remove_dir_all(&backup_db));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
// restore the backup.
|
||||
if existed {
|
||||
try!(fs::rename(&backup_db, &self.client_db));
|
||||
}
|
||||
Err(e.into())
|
||||
}
|
||||
}
|
||||
let temp_dir = self.temp_snapshot_dir();
|
||||
let snapshot_dir = self.snapshot_dir();
|
||||
|
||||
let _ = fs::remove_dir_all(&temp_dir);
|
||||
|
||||
let writer = try!(LooseWriter::new(temp_dir.clone()));
|
||||
|
||||
let guard = Guard::new(temp_dir.clone());
|
||||
try!(client.take_snapshot(writer, BlockID::Number(num), &self.progress));
|
||||
|
||||
info!("Finished taking snapshot at #{}", num);
|
||||
|
||||
let mut reader = self.reader.write();
|
||||
|
||||
// destroy the old snapshot reader.
|
||||
*reader = None;
|
||||
|
||||
try!(fs::rename(temp_dir, &snapshot_dir));
|
||||
|
||||
*reader = Some(try!(LooseReader::new(snapshot_dir)));
|
||||
|
||||
guard.disarm();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Initialize the restoration synchronously.
|
||||
@@ -356,11 +369,15 @@ impl Service {
|
||||
db_path: self.restoration_db(),
|
||||
writer: writer,
|
||||
genesis: &self.genesis_block,
|
||||
guard: Guard::new(rest_dir),
|
||||
};
|
||||
|
||||
*res = Some(try!(Restoration::new(params)));
|
||||
|
||||
*self.status.lock() = RestorationStatus::Ongoing;
|
||||
*self.status.lock() = RestorationStatus::Ongoing {
|
||||
state_chunks_done: self.state_chunks.load(Ordering::Relaxed) as u32,
|
||||
block_chunks_done: self.block_chunks.load(Ordering::Relaxed) as u32,
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -393,14 +410,7 @@ impl Service {
|
||||
try!(fs::create_dir(&snapshot_dir));
|
||||
|
||||
trace!(target: "snapshot", "copying restored snapshot files over");
|
||||
for maybe_file in try!(fs::read_dir(self.temp_recovery_dir())) {
|
||||
let path = try!(maybe_file).path();
|
||||
if let Some(name) = path.file_name().map(|x| x.to_owned()) {
|
||||
let mut new_path = snapshot_dir.clone();
|
||||
new_path.push(name);
|
||||
try!(fs::rename(path, new_path));
|
||||
}
|
||||
}
|
||||
try!(fs::rename(self.temp_recovery_dir(), &snapshot_dir));
|
||||
|
||||
let _ = fs::remove_dir_all(self.restoration_dir());
|
||||
|
||||
@@ -418,7 +428,7 @@ impl Service {
|
||||
|
||||
match self.status() {
|
||||
RestorationStatus::Inactive | RestorationStatus::Failed => Ok(()),
|
||||
RestorationStatus::Ongoing => {
|
||||
RestorationStatus::Ongoing { .. } => {
|
||||
let res = {
|
||||
let rest = match *restoration {
|
||||
Some(ref mut r) => r,
|
||||
@@ -489,10 +499,6 @@ impl SnapshotService for Service {
|
||||
*self.status.lock()
|
||||
}
|
||||
|
||||
fn chunks_done(&self) -> (usize, usize) {
|
||||
(self.state_chunks.load(Ordering::Relaxed), self.block_chunks.load(Ordering::Relaxed))
|
||||
}
|
||||
|
||||
fn begin_restore(&self, manifest: ManifestData) {
|
||||
self.io_channel.send(ClientIoMessage::BeginRestoration(manifest))
|
||||
.expect("snapshot service and io service are kept alive by client service; qed");
|
||||
@@ -520,17 +526,31 @@ impl SnapshotService for Service {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Service {
|
||||
fn drop(&mut self) {
|
||||
self.abort_restore();
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
use service::ClientIoMessage;
|
||||
use io::{IoService};
|
||||
use devtools::RandomTempPath;
|
||||
use tests::helpers::get_test_spec;
|
||||
use util::journaldb::Algorithm;
|
||||
|
||||
use snapshot::ManifestData;
|
||||
use error::Error;
|
||||
use snapshot::{ManifestData, RestorationStatus, SnapshotService};
|
||||
use super::*;
|
||||
|
||||
struct NoopDBRestore;
|
||||
impl DatabaseRestore for NoopDBRestore {
|
||||
fn restore_db(&self, _new_db: &str) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sends_async_messages() {
|
||||
let service = IoService::<ClientIoMessage>::start().unwrap();
|
||||
@@ -544,13 +564,13 @@ mod tests {
|
||||
&get_test_spec(),
|
||||
Algorithm::Archive,
|
||||
dir,
|
||||
service.channel()
|
||||
service.channel(),
|
||||
Arc::new(NoopDBRestore),
|
||||
).unwrap();
|
||||
|
||||
assert!(service.manifest().is_none());
|
||||
assert!(service.chunk(Default::default()).is_none());
|
||||
assert_eq!(service.status(), RestorationStatus::Inactive);
|
||||
assert_eq!(service.chunks_done(), (0, 0));
|
||||
|
||||
let manifest = ManifestData {
|
||||
state_hashes: vec![],
|
||||
|
||||
54
ethcore/src/snapshot/snapshot_service_trait.rs
Normal file
54
ethcore/src/snapshot/snapshot_service_trait.rs
Normal file
@@ -0,0 +1,54 @@
|
||||
// Copyright 2015, 2016 Ethcore (UK) Ltd.
|
||||
// This file is part of Parity.
|
||||
|
||||
// Parity is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Parity is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use super::{ManifestData, RestorationStatus};
|
||||
use util::{Bytes, H256};
|
||||
use ipc::IpcConfig;
|
||||
|
||||
/// The interface for a snapshot network service.
|
||||
/// This handles:
|
||||
/// - restoration of snapshots to temporary databases.
|
||||
/// - responding to queries for snapshot manifests and chunks
|
||||
#[derive(Ipc)]
|
||||
#[ipc(client_ident="RemoteSnapshotService")]
|
||||
pub trait SnapshotService : Sync + Send {
|
||||
/// Query the most recent manifest data.
|
||||
fn manifest(&self) -> Option<ManifestData>;
|
||||
|
||||
/// Get raw chunk for a given hash.
|
||||
fn chunk(&self, hash: H256) -> Option<Bytes>;
|
||||
|
||||
/// Ask the snapshot service for the restoration status.
|
||||
fn status(&self) -> RestorationStatus;
|
||||
|
||||
/// Begin snapshot restoration.
|
||||
/// If restoration in-progress, this will reset it.
|
||||
/// From this point on, any previous snapshot may become unavailable.
|
||||
fn begin_restore(&self, manifest: ManifestData);
|
||||
|
||||
/// Abort an in-progress restoration if there is one.
|
||||
fn abort_restore(&self);
|
||||
|
||||
/// Feed a raw state chunk to the service to be processed asynchronously.
|
||||
/// no-op if not currently restoring.
|
||||
fn restore_state_chunk(&self, hash: H256, chunk: Bytes);
|
||||
|
||||
/// Feed a raw block chunk to the service to be processed asynchronously.
|
||||
/// no-op if currently restoring.
|
||||
fn restore_block_chunk(&self, hash: H256, chunk: Bytes);
|
||||
}
|
||||
|
||||
impl IpcConfig for SnapshotService { }
|
||||
192
ethcore/src/snapshot/watcher.rs
Normal file
192
ethcore/src/snapshot/watcher.rs
Normal file
@@ -0,0 +1,192 @@
|
||||
// Copyright 2015, 2016 Ethcore (UK) Ltd.
|
||||
// This file is part of Parity.
|
||||
|
||||
// Parity is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Parity is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! Watcher for snapshot-related chain events.
|
||||
|
||||
use client::{BlockChainClient, Client, ChainNotify};
|
||||
use ids::BlockID;
|
||||
use service::ClientIoMessage;
|
||||
use views::HeaderView;
|
||||
|
||||
use io::IoChannel;
|
||||
use util::hash::H256;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
// helper trait for transforming hashes to numbers and checking if syncing.
|
||||
trait Oracle: Send + Sync {
|
||||
fn to_number(&self, hash: H256) -> Option<u64>;
|
||||
|
||||
fn is_major_syncing(&self) -> bool;
|
||||
}
|
||||
|
||||
impl Oracle for Client {
|
||||
fn to_number(&self, hash: H256) -> Option<u64> {
|
||||
self.block_header(BlockID::Hash(hash)).map(|h| HeaderView::new(&h).number())
|
||||
}
|
||||
|
||||
fn is_major_syncing(&self) -> bool {
|
||||
let queue_info = self.queue_info();
|
||||
|
||||
queue_info.unverified_queue_size + queue_info.verified_queue_size > 3
|
||||
}
|
||||
}
|
||||
|
||||
// helper trait for broadcasting a block to take a snapshot at.
|
||||
trait Broadcast: Send + Sync {
|
||||
fn take_at(&self, num: Option<u64>);
|
||||
}
|
||||
|
||||
impl Broadcast for IoChannel<ClientIoMessage> {
|
||||
fn take_at(&self, num: Option<u64>) {
|
||||
let num = match num {
|
||||
Some(n) => n,
|
||||
None => return,
|
||||
};
|
||||
|
||||
trace!(target: "snapshot_watcher", "broadcast: {}", num);
|
||||
|
||||
if let Err(e) = self.send(ClientIoMessage::TakeSnapshot(num)) {
|
||||
warn!("Snapshot watcher disconnected from IoService: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A `ChainNotify` implementation which will trigger a snapshot event
|
||||
/// at certain block numbers.
|
||||
pub struct Watcher {
|
||||
oracle: Arc<Oracle>,
|
||||
broadcast: Box<Broadcast>,
|
||||
period: u64,
|
||||
history: u64,
|
||||
}
|
||||
|
||||
impl Watcher {
|
||||
/// Create a new `Watcher` which will trigger a snapshot event
|
||||
/// once every `period` blocks, but only after that block is
|
||||
/// `history` blocks old.
|
||||
pub fn new(client: Arc<Client>, channel: IoChannel<ClientIoMessage>, period: u64, history: u64) -> Self {
|
||||
Watcher {
|
||||
oracle: client,
|
||||
broadcast: Box::new(channel),
|
||||
period: period,
|
||||
history: history,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ChainNotify for Watcher {
|
||||
fn new_blocks(
|
||||
&self,
|
||||
imported: Vec<H256>,
|
||||
_: Vec<H256>,
|
||||
_: Vec<H256>,
|
||||
_: Vec<H256>,
|
||||
_: Vec<H256>,
|
||||
_duration: u64)
|
||||
{
|
||||
if self.oracle.is_major_syncing() { return }
|
||||
|
||||
trace!(target: "snapshot_watcher", "{} imported", imported.len());
|
||||
|
||||
let highest = imported.into_iter()
|
||||
.filter_map(|h| self.oracle.to_number(h))
|
||||
.filter(|&num| num >= self.period + self.history)
|
||||
.map(|num| num - self.history)
|
||||
.filter(|num| num % self.period == 0)
|
||||
.fold(0, ::std::cmp::max);
|
||||
|
||||
match highest {
|
||||
0 => self.broadcast.take_at(None),
|
||||
_ => self.broadcast.take_at(Some(highest)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{Broadcast, Oracle, Watcher};
|
||||
|
||||
use client::ChainNotify;
|
||||
|
||||
use util::{H256, U256};
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
struct TestOracle(HashMap<H256, u64>);
|
||||
|
||||
impl Oracle for TestOracle {
|
||||
fn to_number(&self, hash: H256) -> Option<u64> {
|
||||
self.0.get(&hash).cloned()
|
||||
}
|
||||
|
||||
fn is_major_syncing(&self) -> bool { false }
|
||||
}
|
||||
|
||||
struct TestBroadcast(Option<u64>);
|
||||
impl Broadcast for TestBroadcast {
|
||||
fn take_at(&self, num: Option<u64>) {
|
||||
if num != self.0 {
|
||||
panic!("Watcher broadcast wrong number. Expected {:?}, found {:?}", self.0, num);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// helper harness for tests which expect a notification.
|
||||
fn harness(numbers: Vec<u64>, period: u64, history: u64, expected: Option<u64>) {
|
||||
let hashes: Vec<_> = numbers.clone().into_iter().map(|x| H256::from(U256::from(x))).collect();
|
||||
let map = hashes.clone().into_iter().zip(numbers).collect();
|
||||
|
||||
let watcher = Watcher {
|
||||
oracle: Arc::new(TestOracle(map)),
|
||||
broadcast: Box::new(TestBroadcast(expected)),
|
||||
period: period,
|
||||
history: history,
|
||||
};
|
||||
|
||||
watcher.new_blocks(
|
||||
hashes,
|
||||
vec![],
|
||||
vec![],
|
||||
vec![],
|
||||
vec![],
|
||||
0,
|
||||
);
|
||||
}
|
||||
|
||||
// helper
|
||||
|
||||
#[test]
|
||||
fn should_not_fire() {
|
||||
harness(vec![0], 5, 0, None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fires_once_for_two() {
|
||||
harness(vec![14, 15], 10, 5, Some(10));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn finds_highest() {
|
||||
harness(vec![15, 25], 10, 5, Some(20));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn doesnt_fire_before_history() {
|
||||
harness(vec![10, 11], 10, 5, None);
|
||||
}
|
||||
}
|
||||
@@ -31,3 +31,5 @@ pub mod trace_filter;
|
||||
pub mod call_analytics;
|
||||
pub mod transaction_import;
|
||||
pub mod block_import_error;
|
||||
pub mod restoration_status;
|
||||
pub mod snapshot_manifest;
|
||||
|
||||
34
ethcore/src/types/restoration_status.rs
Normal file
34
ethcore/src/types/restoration_status.rs
Normal file
@@ -0,0 +1,34 @@
|
||||
// Copyright 2015, 2016 Ethcore (UK) Ltd.
|
||||
// This file is part of Parity.
|
||||
|
||||
// Parity is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Parity is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! Restoration status type definition
|
||||
|
||||
/// Statuses for restorations.
|
||||
#[derive(PartialEq, Eq, Clone, Copy, Debug, Binary)]
|
||||
pub enum RestorationStatus {
|
||||
/// No restoration.
|
||||
Inactive,
|
||||
/// Ongoing restoration.
|
||||
Ongoing {
|
||||
/// Number of state chunks completed.
|
||||
state_chunks_done: u32,
|
||||
/// Number of block chunks completed.
|
||||
block_chunks_done: u32,
|
||||
},
|
||||
/// Failed restoration.
|
||||
Failed,
|
||||
}
|
||||
|
||||
70
ethcore/src/types/snapshot_manifest.rs
Normal file
70
ethcore/src/types/snapshot_manifest.rs
Normal file
@@ -0,0 +1,70 @@
|
||||
// Copyright 2015, 2016 Ethcore (UK) Ltd.
|
||||
// This file is part of Parity.
|
||||
|
||||
// Parity is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Parity is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! Snapshot manifest type definition
|
||||
|
||||
use util::hash::H256;
|
||||
use rlp::*;
|
||||
use util::Bytes;
|
||||
|
||||
/// Manifest data.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Binary)]
|
||||
pub struct ManifestData {
|
||||
/// List of state chunk hashes.
|
||||
pub state_hashes: Vec<H256>,
|
||||
/// List of block chunk hashes.
|
||||
pub block_hashes: Vec<H256>,
|
||||
/// The final, expected state root.
|
||||
pub state_root: H256,
|
||||
/// Block number this snapshot was taken at.
|
||||
pub block_number: u64,
|
||||
/// Block hash this snapshot was taken at.
|
||||
pub block_hash: H256,
|
||||
}
|
||||
|
||||
impl ManifestData {
|
||||
/// Encode the manifest data to rlp.
|
||||
pub fn into_rlp(self) -> Bytes {
|
||||
let mut stream = RlpStream::new_list(5);
|
||||
stream.append(&self.state_hashes);
|
||||
stream.append(&self.block_hashes);
|
||||
stream.append(&self.state_root);
|
||||
stream.append(&self.block_number);
|
||||
stream.append(&self.block_hash);
|
||||
|
||||
stream.out()
|
||||
}
|
||||
|
||||
/// Try to restore manifest data from raw bytes, interpreted as RLP.
|
||||
pub fn from_rlp(raw: &[u8]) -> Result<Self, DecoderError> {
|
||||
let decoder = UntrustedRlp::new(raw);
|
||||
|
||||
let state_hashes: Vec<H256> = try!(decoder.val_at(0));
|
||||
let block_hashes: Vec<H256> = try!(decoder.val_at(1));
|
||||
let state_root: H256 = try!(decoder.val_at(2));
|
||||
let block_number: u64 = try!(decoder.val_at(3));
|
||||
let block_hash: H256 = try!(decoder.val_at(4));
|
||||
|
||||
Ok(ManifestData {
|
||||
state_hashes: state_hashes,
|
||||
block_hashes: block_hashes,
|
||||
state_root: state_root,
|
||||
block_number: block_number,
|
||||
block_hash: block_hash,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@ pub use self::canon_verifier::CanonVerifier;
|
||||
pub use self::noop_verifier::NoopVerifier;
|
||||
|
||||
/// Verifier type.
|
||||
#[derive(Debug, PartialEq)]
|
||||
#[derive(Debug, PartialEq, Clone)]
|
||||
pub enum VerifierType {
|
||||
/// Verifies block normally.
|
||||
Canon,
|
||||
|
||||
Reference in New Issue
Block a user