From 8f631a7c60c4a5c447238df8ddbe086de01a6695 Mon Sep 17 00:00:00 2001 From: arkpar Date: Thu, 14 Jan 2016 01:28:37 +0100 Subject: [PATCH] State db sync --- src/client.rs | 56 ++++++++++++++++++++++++++++++++++++++++++--- src/queue.rs | 11 +-------- src/verification.rs | 1 + 3 files changed, 55 insertions(+), 13 deletions(-) diff --git a/src/client.rs b/src/client.rs index f5a4b43a6..bb7c324d0 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,4 +1,5 @@ use util::*; +use rocksdb::{DB}; use blockchain::{BlockChain, BlockProvider}; use views::BlockView; use error::*; @@ -7,6 +8,9 @@ use spec::Spec; use engine::Engine; use queue::BlockQueue; use sync::NetSyncMessage; +use env_info::LastHashes; +use verification::*; +use block::*; /// General block status pub enum BlockStatus { @@ -95,7 +99,8 @@ pub trait BlockChainClient : Sync { /// Blockchain database client backed by a persistent database. Owns and manages a blockchain and a block queue. pub struct Client { chain: Arc>, - _engine: Arc>, + engine: Arc>, + state_db: OverlayDB, queue: BlockQueue, } @@ -103,15 +108,56 @@ impl Client { pub fn new(spec: Spec, path: &Path, message_channel: IoChannel ) -> Result { let chain = Arc::new(RwLock::new(BlockChain::new(&spec.genesis_block(), path))); let engine = Arc::new(try!(spec.to_engine())); + let mut state_path = path.to_path_buf(); + state_path.push("state"); + let db = DB::open_default(state_path.to_str().unwrap()).unwrap(); + Ok(Client { chain: chain.clone(), - _engine: engine.clone(), - queue: BlockQueue::new(chain.clone(), engine.clone(), message_channel), + engine: engine.clone(), + state_db: OverlayDB::new(db), + queue: BlockQueue::new(engine.clone(), message_channel), }) } pub fn import_verified_block(&mut self, bytes: Bytes) { + let block = BlockView::new(&bytes); + let header = block.header_view(); + if let Err(e) = verify_block_final(&bytes, self.engine.deref().deref(), self.chain.read().unwrap().deref()) { + warn!(target: "client", "Stage 3 block verification failed for {}\nError: {:?}", header.sha3(), e); + // TODO: mark as bad + return; + }; + let parent = match self.chain.read().unwrap().block_header(&header.parent_hash()) { + Some(p) => p, + None => { + warn!(target: "client", "Stage 3 import failed for {}: Parent not found ({}) ", header.sha3(), header.parent_hash()); + return; + }, + }; + // build last hashes + let mut last = self.chain.read().unwrap().best_block_hash(); + let mut last_hashes = LastHashes::new(); + last_hashes.resize(256, H256::new()); + for i in 0..255 { + match self.chain.read().unwrap().block_details(&last) { + Some(details) => { + last_hashes[i + 1] = details.parent.clone(); + last = details.parent.clone(); + }, + None => break, + } + } + + let mut b = OpenBlock::new(self.engine.deref().deref(), self.state_db.clone(), &parent, &last_hashes, header.author(), header.extra_data()); + + for t in block.transactions().into_iter() { + if let Err(e) = b.push_transaction(t.clone(), None) { + warn!(target: "client", "Stage 3 transaction import failed for block {}\nTransaction:{:?}\nError: {:?}", header.sha3(), t, e); + return; + }; + } self.chain.write().unwrap().insert_block(&bytes); } } @@ -171,6 +217,10 @@ impl BlockChainClient for Client { } fn import_block(&mut self, bytes: &[u8]) -> ImportResult { + let header = BlockView::new(bytes).header(); + if self.chain.read().unwrap().is_known(&header.hash()) { + return Err(ImportError::AlreadyInChain); + } self.queue.import_block(bytes) } diff --git a/src/queue.rs b/src/queue.rs index 60026e818..f02a4ea72 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -1,6 +1,4 @@ use util::*; -use blockchain::*; -use views::{BlockView}; use verification::*; use error::*; use engine::Engine; @@ -9,16 +7,14 @@ use sync::*; /// A queue of blocks. Sits between network or other I/O and the BlockChain. /// Sorts them ready for blockchain insertion. pub struct BlockQueue { - bc: Arc>, engine: Arc>, message_channel: IoChannel } impl BlockQueue { /// Creates a new queue instance. - pub fn new(bc: Arc>, engine: Arc>, message_channel: IoChannel) -> BlockQueue { + pub fn new(engine: Arc>, message_channel: IoChannel) -> BlockQueue { BlockQueue { - bc: bc, engine: engine, message_channel: message_channel } @@ -30,13 +26,8 @@ impl BlockQueue { /// Add a block to the queue. pub fn import_block(&mut self, bytes: &[u8]) -> ImportResult { - let header = BlockView::new(bytes).header(); - if self.bc.read().unwrap().is_known(&header.hash()) { - return Err(ImportError::AlreadyInChain); - } try!(verify_block_basic(bytes, self.engine.deref().deref())); try!(verify_block_unordered(bytes, self.engine.deref().deref())); - try!(verify_block_final(bytes, self.engine.deref().deref(), self.bc.read().unwrap().deref())); try!(self.message_channel.send(UserMessage(SyncMessage::BlockVerified(bytes.to_vec()))).map_err(|e| Error::from(e))); Ok(()) } diff --git a/src/verification.rs b/src/verification.rs index 5dabcf84f..2c3fe130d 100644 --- a/src/verification.rs +++ b/src/verification.rs @@ -38,6 +38,7 @@ pub fn verify_block_unordered(bytes: &[u8], engine: &Engine) -> Result<(), Error /// Phase 3 verification. Check block information against parent and uncles. pub fn verify_block_final(bytes: &[u8], engine: &Engine, bc: &BC) -> Result<(), Error> where BC: BlockProvider { + // TODO: verify timestamp let block = BlockView::new(bytes); let header = block.header(); let parent = try!(bc.block_header(&header.parent_hash).ok_or::(From::from(BlockError::UnknownParent(header.parent_hash.clone()))));