State db sync
This commit is contained in:
parent
d3a16574d6
commit
8f631a7c60
@ -1,4 +1,5 @@
|
||||
use util::*;
|
||||
use rocksdb::{DB};
|
||||
use blockchain::{BlockChain, BlockProvider};
|
||||
use views::BlockView;
|
||||
use error::*;
|
||||
@ -7,6 +8,9 @@ use spec::Spec;
|
||||
use engine::Engine;
|
||||
use queue::BlockQueue;
|
||||
use sync::NetSyncMessage;
|
||||
use env_info::LastHashes;
|
||||
use verification::*;
|
||||
use block::*;
|
||||
|
||||
/// General block status
|
||||
pub enum BlockStatus {
|
||||
@ -95,7 +99,8 @@ pub trait BlockChainClient : Sync {
|
||||
/// Blockchain database client backed by a persistent database. Owns and manages a blockchain and a block queue.
|
||||
pub struct Client {
|
||||
chain: Arc<RwLock<BlockChain>>,
|
||||
_engine: Arc<Box<Engine>>,
|
||||
engine: Arc<Box<Engine>>,
|
||||
state_db: OverlayDB,
|
||||
queue: BlockQueue,
|
||||
}
|
||||
|
||||
@ -103,15 +108,56 @@ impl Client {
|
||||
pub fn new(spec: Spec, path: &Path, message_channel: IoChannel<NetSyncMessage> ) -> Result<Client, Error> {
|
||||
let chain = Arc::new(RwLock::new(BlockChain::new(&spec.genesis_block(), path)));
|
||||
let engine = Arc::new(try!(spec.to_engine()));
|
||||
let mut state_path = path.to_path_buf();
|
||||
state_path.push("state");
|
||||
let db = DB::open_default(state_path.to_str().unwrap()).unwrap();
|
||||
|
||||
Ok(Client {
|
||||
chain: chain.clone(),
|
||||
_engine: engine.clone(),
|
||||
queue: BlockQueue::new(chain.clone(), engine.clone(), message_channel),
|
||||
engine: engine.clone(),
|
||||
state_db: OverlayDB::new(db),
|
||||
queue: BlockQueue::new(engine.clone(), message_channel),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
pub fn import_verified_block(&mut self, bytes: Bytes) {
|
||||
let block = BlockView::new(&bytes);
|
||||
let header = block.header_view();
|
||||
if let Err(e) = verify_block_final(&bytes, self.engine.deref().deref(), self.chain.read().unwrap().deref()) {
|
||||
warn!(target: "client", "Stage 3 block verification failed for {}\nError: {:?}", header.sha3(), e);
|
||||
// TODO: mark as bad
|
||||
return;
|
||||
};
|
||||
let parent = match self.chain.read().unwrap().block_header(&header.parent_hash()) {
|
||||
Some(p) => p,
|
||||
None => {
|
||||
warn!(target: "client", "Stage 3 import failed for {}: Parent not found ({}) ", header.sha3(), header.parent_hash());
|
||||
return;
|
||||
},
|
||||
};
|
||||
// build last hashes
|
||||
let mut last = self.chain.read().unwrap().best_block_hash();
|
||||
let mut last_hashes = LastHashes::new();
|
||||
last_hashes.resize(256, H256::new());
|
||||
for i in 0..255 {
|
||||
match self.chain.read().unwrap().block_details(&last) {
|
||||
Some(details) => {
|
||||
last_hashes[i + 1] = details.parent.clone();
|
||||
last = details.parent.clone();
|
||||
},
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
|
||||
let mut b = OpenBlock::new(self.engine.deref().deref(), self.state_db.clone(), &parent, &last_hashes, header.author(), header.extra_data());
|
||||
|
||||
for t in block.transactions().into_iter() {
|
||||
if let Err(e) = b.push_transaction(t.clone(), None) {
|
||||
warn!(target: "client", "Stage 3 transaction import failed for block {}\nTransaction:{:?}\nError: {:?}", header.sha3(), t, e);
|
||||
return;
|
||||
};
|
||||
}
|
||||
self.chain.write().unwrap().insert_block(&bytes);
|
||||
}
|
||||
}
|
||||
@ -171,6 +217,10 @@ impl BlockChainClient for Client {
|
||||
}
|
||||
|
||||
fn import_block(&mut self, bytes: &[u8]) -> ImportResult {
|
||||
let header = BlockView::new(bytes).header();
|
||||
if self.chain.read().unwrap().is_known(&header.hash()) {
|
||||
return Err(ImportError::AlreadyInChain);
|
||||
}
|
||||
self.queue.import_block(bytes)
|
||||
}
|
||||
|
||||
|
11
src/queue.rs
11
src/queue.rs
@ -1,6 +1,4 @@
|
||||
use util::*;
|
||||
use blockchain::*;
|
||||
use views::{BlockView};
|
||||
use verification::*;
|
||||
use error::*;
|
||||
use engine::Engine;
|
||||
@ -9,16 +7,14 @@ use sync::*;
|
||||
/// A queue of blocks. Sits between network or other I/O and the BlockChain.
|
||||
/// Sorts them ready for blockchain insertion.
|
||||
pub struct BlockQueue {
|
||||
bc: Arc<RwLock<BlockChain>>,
|
||||
engine: Arc<Box<Engine>>,
|
||||
message_channel: IoChannel<NetSyncMessage>
|
||||
}
|
||||
|
||||
impl BlockQueue {
|
||||
/// Creates a new queue instance.
|
||||
pub fn new(bc: Arc<RwLock<BlockChain>>, engine: Arc<Box<Engine>>, message_channel: IoChannel<NetSyncMessage>) -> BlockQueue {
|
||||
pub fn new(engine: Arc<Box<Engine>>, message_channel: IoChannel<NetSyncMessage>) -> BlockQueue {
|
||||
BlockQueue {
|
||||
bc: bc,
|
||||
engine: engine,
|
||||
message_channel: message_channel
|
||||
}
|
||||
@ -30,13 +26,8 @@ impl BlockQueue {
|
||||
|
||||
/// Add a block to the queue.
|
||||
pub fn import_block(&mut self, bytes: &[u8]) -> ImportResult {
|
||||
let header = BlockView::new(bytes).header();
|
||||
if self.bc.read().unwrap().is_known(&header.hash()) {
|
||||
return Err(ImportError::AlreadyInChain);
|
||||
}
|
||||
try!(verify_block_basic(bytes, self.engine.deref().deref()));
|
||||
try!(verify_block_unordered(bytes, self.engine.deref().deref()));
|
||||
try!(verify_block_final(bytes, self.engine.deref().deref(), self.bc.read().unwrap().deref()));
|
||||
try!(self.message_channel.send(UserMessage(SyncMessage::BlockVerified(bytes.to_vec()))).map_err(|e| Error::from(e)));
|
||||
Ok(())
|
||||
}
|
||||
|
@ -38,6 +38,7 @@ pub fn verify_block_unordered(bytes: &[u8], engine: &Engine) -> Result<(), Error
|
||||
|
||||
/// Phase 3 verification. Check block information against parent and uncles.
|
||||
pub fn verify_block_final<BC>(bytes: &[u8], engine: &Engine, bc: &BC) -> Result<(), Error> where BC: BlockProvider {
|
||||
// TODO: verify timestamp
|
||||
let block = BlockView::new(bytes);
|
||||
let header = block.header();
|
||||
let parent = try!(bc.block_header(&header.parent_hash).ok_or::<Error>(From::from(BlockError::UnknownParent(header.parent_hash.clone()))));
|
||||
|
Loading…
Reference in New Issue
Block a user