diff --git a/.gitignore b/.gitignore index 5fc1b92c5..8b4a3b588 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,6 @@ Cargo.lock # mac stuff .DS_Store + +# gdb files +.gdb_history diff --git a/ b/ new file mode 100644 index 000000000..e69de29bb diff --git a/src/bin/client.rs b/src/bin/client.rs new file mode 100644 index 000000000..a644ecd1b --- /dev/null +++ b/src/bin/client.rs @@ -0,0 +1,32 @@ +extern crate ethcore_util as util; +extern crate ethcore; +extern crate rustc_serialize; +extern crate env_logger; + +use std::io::*; +use std::env; +use std::sync::Arc; +use util::hash::*; +use util::network::{NetworkService}; +use ethcore::client::Client; +use ethcore::sync::EthSync; +use ethcore::ethereum; + +fn main() { + ::env_logger::init().ok(); + let mut service = NetworkService::start().unwrap(); + //TODO: replace with proper genesis and chain params. + let spec = ethereum::new_frontier(); + let mut dir = env::temp_dir(); + dir.push(H32::random().hex()); + let client = Arc::new(Client::new(spec, &dir).unwrap()); + EthSync::register(&mut service, client); + loop { + let mut cmd = String::new(); + stdin().read_line(&mut cmd).unwrap(); + if cmd == "quit\n" || cmd == "exit\n" || cmd == "q\n" { + break; + } + } +} + diff --git a/src/blockchain.rs b/src/blockchain.rs index be2b8ce68..c102ba1b4 100644 --- a/src/blockchain.rs +++ b/src/blockchain.rs @@ -8,7 +8,7 @@ use transaction::*; use views::*; /// Represents a tree route between `from` block and `to` block: -/// +/// /// - `blocks` - a vector of hashes of all blocks, ordered from `from` to `to`. /// /// - `ancestor` - best common ancestor of these blocks. @@ -33,7 +33,7 @@ pub struct CacheSize { /// Information about best block gathered together struct BestBlock { pub hash: H256, - pub number: usize, + pub number: BlockNumber, pub total_difficulty: U256 } @@ -41,27 +41,27 @@ impl BestBlock { fn new() -> BestBlock { BestBlock { hash: H256::new(), - number: 0usize, + number: 0, total_difficulty: U256::from(0) } } } /// Structure providing fast access to blockchain data. -/// +/// /// **Does not do input data verification.** pub struct BlockChain { - best_block: RefCell, + best_block: RwLock, // block cache - blocks: RefCell>, + blocks: RwLock>, // extra caches - block_details: RefCell>, - block_hashes: RefCell>, - transaction_addresses: RefCell>, - block_logs: RefCell>, - blocks_blooms: RefCell>, + block_details: RwLock>, + block_hashes: RwLock>, + transaction_addresses: RwLock>, + block_logs: RwLock>, + blocks_blooms: RwLock>, extras_db: DB, blocks_db: DB @@ -69,7 +69,7 @@ pub struct BlockChain { impl BlockChain { /// Create new instance of blockchain from given Genesis - /// + /// /// ```rust /// extern crate ethcore_util as util; /// extern crate ethcore; @@ -80,7 +80,7 @@ impl BlockChain { /// use ethcore::ethereum; /// use util::hash::*; /// use util::uint::*; - /// + /// /// fn main() { /// let spec = ethereum::new_frontier(); /// @@ -107,13 +107,13 @@ impl BlockChain { let blocks_db = DB::open_default(blocks_path.to_str().unwrap()).unwrap(); let bc = BlockChain { - best_block: RefCell::new(BestBlock::new()), - blocks: RefCell::new(HashMap::new()), - block_details: RefCell::new(HashMap::new()), - block_hashes: RefCell::new(HashMap::new()), - transaction_addresses: RefCell::new(HashMap::new()), - block_logs: RefCell::new(HashMap::new()), - blocks_blooms: RefCell::new(HashMap::new()), + best_block: RwLock::new(BestBlock::new()), + blocks: RwLock::new(HashMap::new()), + block_details: RwLock::new(HashMap::new()), + block_hashes: RwLock::new(HashMap::new()), + transaction_addresses: RwLock::new(HashMap::new()), + block_logs: RwLock::new(HashMap::new()), + blocks_blooms: RwLock::new(HashMap::new()), extras_db: extras_db, blocks_db: blocks_db }; @@ -142,13 +142,13 @@ impl BlockChain { batch.put_extras(&header.number(), &hash); batch.put(b"best", &hash).unwrap(); bc.extras_db.write(batch).unwrap(); - + hash } }; { - let mut best_block = bc.best_block.borrow_mut(); + let mut best_block = bc.best_block.write().unwrap(); best_block.number = bc.block_number(&best_block_hash).unwrap(); best_block.total_difficulty = bc.block_details(&best_block_hash).unwrap().total_difficulty; best_block.hash = best_block_hash; @@ -158,52 +158,57 @@ impl BlockChain { } /// Returns a tree route between `from` and `to`, which is a tuple of: - /// + /// /// - a vector of hashes of all blocks, ordered from `from` to `to`. /// /// - common ancestor of these blocks. /// /// - an index where best common ancestor would be - /// + /// /// 1.) from newer to older - /// + /// /// - bc: `A1 -> A2 -> A3 -> A4 -> A5` /// - from: A5, to: A4 - /// - route: + /// - route: /// /// ```json /// { blocks: [A5], ancestor: A4, index: 1 } /// ``` - /// + /// /// 2.) from older to newer - /// + /// /// - bc: `A1 -> A2 -> A3 -> A4 -> A5` /// - from: A3, to: A4 - /// - route: - /// + /// - route: + /// /// ```json /// { blocks: [A4], ancestor: A3, index: 0 } /// ``` /// /// 3.) fork: /// - /// - bc: + /// - bc: /// /// ```text /// A1 -> A2 -> A3 -> A4 /// -> B3 -> B4 - /// ``` + /// ``` /// - from: B4, to: A4 - /// - route: - /// + /// - route: + /// /// ```json /// { blocks: [B4, B3, A3, A4], ancestor: A2, index: 2 } /// ``` - pub fn tree_route(&self, from: H256, to: H256) -> TreeRoute { - let from_details = self.block_details(&from).expect("from hash is invalid!"); - let to_details = self.block_details(&to).expect("to hash is invalid!"); - - self._tree_route((from_details, from), (to_details, to)) + pub fn tree_route(&self, from: H256, to: H256) -> Option { + let from_details = match self.block_details(&from) { + Some(h) => h, + None => return None, + }; + let to_details = match self.block_details(&to) { + Some(h) => h, + None => return None, + }; + Some(self._tree_route((from_details, from), (to_details, to))) } /// Similar to `tree_route` function, but can be used to return a route @@ -262,7 +267,7 @@ impl BlockChain { // create views onto rlp let block = BlockView::new(bytes); let header = block.header_view(); - let hash = block.sha3(); + let hash = header.sha3(); if self.is_known(&hash) { return; @@ -273,13 +278,13 @@ impl BlockChain { let (batch, new_best) = self.block_to_extras_insert_batch(bytes); // update best block - let mut best_block = self.best_block.borrow_mut(); + let mut best_block = self.best_block.write().unwrap(); if let Some(b) = new_best { *best_block = b; } // update caches - let mut write = self.block_details.borrow_mut(); + let mut write = self.block_details.write().unwrap(); write.remove(&header.parent_hash()); // update extras database @@ -292,7 +297,7 @@ impl BlockChain { // create views onto rlp let block = BlockView::new(bytes); let header = block.header_view(); - + // prepare variables let hash = block.sha3(); let mut parent_details = self.block_details(&header.parent_hash()).expect("Invalid parent hash."); @@ -307,7 +312,7 @@ impl BlockChain { parent: parent_hash.clone(), children: vec![] }; - + // prepare the batch let batch = WriteBatch::new(); @@ -323,7 +328,7 @@ impl BlockChain { return (batch, None); } - // if its new best block we need to make sure that all ancestors + // if its new best block we need to make sure that all ancestors // are moved to "canon chain" // find the route between old best block and the new one let best_hash = self.best_block_hash(); @@ -338,7 +343,7 @@ impl BlockChain { let ancestor_number = self.block_number(&route.ancestor).unwrap(); let start_number = ancestor_number + 1; for (index, hash) in route.blocks.iter().skip(route.index).enumerate() { - batch.put_extras(&(start_number + index), hash); + batch.put_extras(&(start_number + index as BlockNumber), hash); } }, // route.blocks.len() could be 0 only if inserted block is best block, @@ -358,7 +363,7 @@ impl BlockChain { (batch, Some(best_block)) } - /// Returns true if the given block is known + /// Returns true if the given block is known /// (though not necessarily a part of the canon chain). pub fn is_known(&self, hash: &H256) -> bool { self.query_extras_exist(hash, &self.block_details) @@ -409,27 +414,27 @@ impl BlockChain { } /// Get the hash of given block's number. - pub fn block_hash(&self, index: usize) -> Option { + pub fn block_hash(&self, index: BlockNumber) -> Option { self.query_extras(&index, &self.block_hashes) } /// Get best block hash. pub fn best_block_hash(&self) -> H256 { - self.best_block.borrow().hash.clone() + self.best_block.read().unwrap().hash.clone() } /// Get best block number. - pub fn best_block_number(&self) -> usize { - self.best_block.borrow().number + pub fn best_block_number(&self) -> BlockNumber { + self.best_block.read().unwrap().number } /// Get best block total difficulty. pub fn best_block_total_difficulty(&self) -> U256 { - self.best_block.borrow().total_difficulty + self.best_block.read().unwrap().total_difficulty } /// Get the number of given block's hash. - pub fn block_number(&self, hash: &H256) -> Option { + pub fn block_number(&self, hash: &H256) -> Option { self.block(hash).map(|bytes| BlockView::new(&bytes).header_view().number()) } @@ -438,9 +443,10 @@ impl BlockChain { self.query_extras(hash, &self.block_logs) } - fn block(&self, hash: &H256) -> Option { + /// Get raw block data + pub fn block(&self, hash: &H256) -> Option { { - let read = self.blocks.borrow(); + let read = self.blocks.read().unwrap(); match read.get(hash) { Some(v) => return Some(v.clone()), None => () @@ -453,7 +459,7 @@ impl BlockChain { match opt { Some(b) => { let bytes: Bytes = b.to_vec(); - let mut write = self.blocks.borrow_mut(); + let mut write = self.blocks.write().unwrap(); write.insert(hash.clone(), bytes.clone()); Some(bytes) }, @@ -461,11 +467,11 @@ impl BlockChain { } } - fn query_extras(&self, hash: &K, cache: &RefCell>) -> Option where - T: Clone + Decodable + ExtrasIndexable, + fn query_extras(&self, hash: &K, cache: &RwLock>) -> Option where + T: Clone + Decodable + ExtrasIndexable, K: ExtrasSliceConvertable + Eq + Hash + Clone { { - let read = cache.borrow(); + let read = cache.read().unwrap(); match read.get(hash) { Some(v) => return Some(v.clone()), None => () @@ -473,17 +479,17 @@ impl BlockChain { } self.extras_db.get_extras(hash).map(| t: T | { - let mut write = cache.borrow_mut(); + let mut write = cache.write().unwrap(); write.insert(hash.clone(), t.clone()); t }) } - fn query_extras_exist(&self, hash: &K, cache: &RefCell>) -> bool where + fn query_extras_exist(&self, hash: &K, cache: &RwLock>) -> bool where K: ExtrasSliceConvertable + Eq + Hash + Clone, T: ExtrasIndexable { { - let read = cache.borrow(); + let read = cache.read().unwrap(); match read.get(hash) { Some(_) => return true, None => () @@ -496,21 +502,21 @@ impl BlockChain { /// Get current cache size. pub fn cache_size(&self) -> CacheSize { CacheSize { - blocks: self.blocks.heap_size_of_children(), - block_details: self.block_details.heap_size_of_children(), - transaction_addresses: self.transaction_addresses.heap_size_of_children(), - block_logs: self.block_logs.heap_size_of_children(), - blocks_blooms: self.blocks_blooms.heap_size_of_children() + blocks: self.blocks.read().unwrap().heap_size_of_children(), + block_details: self.block_details.read().unwrap().heap_size_of_children(), + transaction_addresses: self.transaction_addresses.read().unwrap().heap_size_of_children(), + block_logs: self.block_logs.read().unwrap().heap_size_of_children(), + blocks_blooms: self.blocks_blooms.read().unwrap().heap_size_of_children() } } /// Tries to squeeze the cache if its too big. pub fn squeeze_to_fit(&self, size: CacheSize) { - self.blocks.borrow_mut().squeeze(size.blocks); - self.block_details.borrow_mut().squeeze(size.block_details); - self.transaction_addresses.borrow_mut().squeeze(size.transaction_addresses); - self.block_logs.borrow_mut().squeeze(size.block_logs); - self.blocks_blooms.borrow_mut().squeeze(size.blocks_blooms); + self.blocks.write().unwrap().squeeze(size.blocks); + self.block_details.write().unwrap().squeeze(size.block_details); + self.transaction_addresses.write().unwrap().squeeze(size.transaction_addresses); + self.block_logs.write().unwrap().squeeze(size.block_logs); + self.blocks_blooms.write().unwrap().squeeze(size.blocks_blooms); } } @@ -530,7 +536,7 @@ mod tests { dir.push(H32::random().hex()); let bc = BlockChain::new(&genesis, &dir); - + let genesis_hash = H256::from_str("3caa2203f3d7c136c0295ed128a7d31cea520b1ca5e27afe17d0853331798942").unwrap(); assert_eq!(bc.genesis_hash(), genesis_hash.clone()); @@ -539,7 +545,6 @@ mod tests { assert_eq!(bc.block_hash(0), Some(genesis_hash.clone())); assert_eq!(bc.block_hash(1), None); - let first = "f90285f90219a03caa2203f3d7c136c0295ed128a7d31cea520b1ca5e27afe17d0853331798942a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347948888f1f195afa192cfee860698584c030f4c9db1a0bac6177a79e910c98d86ec31a09ae37ac2de15b754fd7bed1ba52362c49416bfa0d45893a296c1490a978e0bd321b5f2635d8280365c1fe9f693d65f233e791344a0c7778a7376099ee2e5c455791c1885b5c361b95713fddcbe32d97fd01334d296b90100000000000000000010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000200000000000000000008000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000040000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000200000000000400000000000000000000000000000000000000000000000000000008302000001832fefd882560b845627cb99a00102030405060708091011121314151617181920212223242526272829303132a08ccb2837fb2923bd97e8f2d08ea32012d6e34be018c73e49a0f98843e8f47d5d88e53be49fec01012ef866f864800a82c35094095e7baea6a6c7c4c2dfeb977efac326af552d8785012a05f200801ba0cb088b8d2ff76a7b2c6616c9d02fb6b7a501afbf8b69d7180b09928a1b80b5e4a06448fe7476c606582039bb72a9f6f4b4fad18507b8dfbd00eebbe151cc573cd2c0".from_hex().unwrap(); bc.insert_block(&first); @@ -594,52 +599,52 @@ mod tests { assert_eq!(bc.block_hash(3).unwrap(), b3a_hash); // test trie route - let r0_1 = bc.tree_route(genesis_hash.clone(), b1_hash.clone()); + let r0_1 = bc.tree_route(genesis_hash.clone(), b1_hash.clone()).unwrap(); assert_eq!(r0_1.ancestor, genesis_hash); assert_eq!(r0_1.blocks, [b1_hash.clone()]); assert_eq!(r0_1.index, 0); - let r0_2 = bc.tree_route(genesis_hash.clone(), b2_hash.clone()); + let r0_2 = bc.tree_route(genesis_hash.clone(), b2_hash.clone()).unwrap(); assert_eq!(r0_2.ancestor, genesis_hash); assert_eq!(r0_2.blocks, [b1_hash.clone(), b2_hash.clone()]); assert_eq!(r0_2.index, 0); - let r1_3a = bc.tree_route(b1_hash.clone(), b3a_hash.clone()); + let r1_3a = bc.tree_route(b1_hash.clone(), b3a_hash.clone()).unwrap(); assert_eq!(r1_3a.ancestor, b1_hash); assert_eq!(r1_3a.blocks, [b2_hash.clone(), b3a_hash.clone()]); assert_eq!(r1_3a.index, 0); - let r1_3b = bc.tree_route(b1_hash.clone(), b3b_hash.clone()); + let r1_3b = bc.tree_route(b1_hash.clone(), b3b_hash.clone()).unwrap(); assert_eq!(r1_3b.ancestor, b1_hash); assert_eq!(r1_3b.blocks, [b2_hash.clone(), b3b_hash.clone()]); assert_eq!(r1_3b.index, 0); - let r3a_3b = bc.tree_route(b3a_hash.clone(), b3b_hash.clone()); + let r3a_3b = bc.tree_route(b3a_hash.clone(), b3b_hash.clone()).unwrap(); assert_eq!(r3a_3b.ancestor, b2_hash); assert_eq!(r3a_3b.blocks, [b3a_hash.clone(), b3b_hash.clone()]); assert_eq!(r3a_3b.index, 1); - let r1_0 = bc.tree_route(b1_hash.clone(), genesis_hash.clone()); + let r1_0 = bc.tree_route(b1_hash.clone(), genesis_hash.clone()).unwrap(); assert_eq!(r1_0.ancestor, genesis_hash); assert_eq!(r1_0.blocks, [b1_hash.clone()]); assert_eq!(r1_0.index, 1); - let r2_0 = bc.tree_route(b2_hash.clone(), genesis_hash.clone()); + let r2_0 = bc.tree_route(b2_hash.clone(), genesis_hash.clone()).unwrap(); assert_eq!(r2_0.ancestor, genesis_hash); assert_eq!(r2_0.blocks, [b2_hash.clone(), b1_hash.clone()]); assert_eq!(r2_0.index, 2); - - let r3a_1 = bc.tree_route(b3a_hash.clone(), b1_hash.clone()); + + let r3a_1 = bc.tree_route(b3a_hash.clone(), b1_hash.clone()).unwrap(); assert_eq!(r3a_1.ancestor, b1_hash); assert_eq!(r3a_1.blocks, [b3a_hash.clone(), b2_hash.clone()]); assert_eq!(r3a_1.index, 2); - let r3b_1 = bc.tree_route(b3b_hash.clone(), b1_hash.clone()); + let r3b_1 = bc.tree_route(b3b_hash.clone(), b1_hash.clone()).unwrap(); assert_eq!(r3b_1.ancestor, b1_hash); assert_eq!(r3b_1.blocks, [b3b_hash.clone(), b2_hash.clone()]); assert_eq!(r3b_1.index, 2); - let r3b_3a = bc.tree_route(b3b_hash.clone(), b3a_hash.clone()); + let r3b_3a = bc.tree_route(b3b_hash.clone(), b3a_hash.clone()).unwrap(); assert_eq!(r3b_3a.ancestor, b2_hash); assert_eq!(r3b_3a.blocks, [b3b_hash.clone(), b3a_hash.clone()]); assert_eq!(r3b_3a.index, 1); diff --git a/src/builtin.rs b/src/builtin.rs index 7f8f0690b..0c1e60d5f 100644 --- a/src/builtin.rs +++ b/src/builtin.rs @@ -12,6 +12,11 @@ pub struct Builtin { pub execute: Box, } +// Rust does not mark closurer that do not capture as Sync +// We promise that all builtins are thread safe since they only operate on given input. +unsafe impl Sync for Builtin {} +unsafe impl Send for Builtin {} + impl fmt::Debug for Builtin { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "") @@ -266,4 +271,4 @@ fn from_json() { let mut o = [255u8; 4]; (*b.execute)(&i[..], &mut o[..]); assert_eq!(i, o); -} \ No newline at end of file +} diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 000000000..b914dba19 --- /dev/null +++ b/src/client.rs @@ -0,0 +1,199 @@ +use util::*; +use blockchain::BlockChain; +use views::BlockView; +use error::*; +use header::BlockNumber; +use spec::Spec; +use engine::Engine; + +/// General block status +pub enum BlockStatus { + /// Part of the blockchain. + InChain, + /// Queued for import. + Queued, + /// Known as bad. + Bad, + /// Unknown. + Unknown, +} + +/// Result of import block operation. +pub type ImportResult = Result<(), ImportError>; + +/// Information about the blockchain gthered together. +pub struct BlockChainInfo { + /// Blockchain difficulty. + pub total_difficulty: U256, + /// Block queue difficulty. + pub pending_total_difficulty: U256, + /// Genesis block hash. + pub genesis_hash: H256, + /// Best blockchain block hash. + pub best_block_hash: H256, + /// Best blockchain block number. + pub best_block_number: BlockNumber +} + +/// Block queue status +pub struct BlockQueueStatus { + pub full: bool, +} + +pub type TreeRoute = ::blockchain::TreeRoute; + +/// Blockchain database client. Owns and manages a blockchain and a block queue. +pub trait BlockChainClient : Sync { + /// Get raw block header data by block header hash. + fn block_header(&self, hash: &H256) -> Option; + + /// Get raw block body data by block header hash. + /// Block body is an RLP list of two items: uncles and transactions. + fn block_body(&self, hash: &H256) -> Option; + + /// Get raw block data by block header hash. + fn block(&self, hash: &H256) -> Option; + + /// Get block status by block header hash. + fn block_status(&self, hash: &H256) -> BlockStatus; + + /// Get raw block header data by block number. + fn block_header_at(&self, n: BlockNumber) -> Option; + + /// Get raw block body data by block number. + /// Block body is an RLP list of two items: uncles and transactions. + fn block_body_at(&self, n: BlockNumber) -> Option; + + /// Get raw block data by block number. + fn block_at(&self, n: BlockNumber) -> Option; + + /// Get block status by block number. + fn block_status_at(&self, n: BlockNumber) -> BlockStatus; + + /// Get a tree route between `from` and `to`. + /// See `BlockChain::tree_route`. + fn tree_route(&self, from: &H256, to: &H256) -> Option; + + /// Get latest state node + fn state_data(&self, hash: &H256) -> Option; + + /// Get raw block receipts data by block header hash. + fn block_receipts(&self, hash: &H256) -> Option; + + /// Import a block into the blockchain. + fn import_block(&mut self, byte: &[u8]) -> ImportResult; + + /// Get block queue information. + fn queue_status(&self) -> BlockQueueStatus; + + /// Clear block queue and abort all import activity. + fn clear_queue(&mut self); + + /// Get blockchain information. + fn chain_info(&self) -> BlockChainInfo; +} + +/// Blockchain database client backed by a persistent database. Owns and manages a blockchain and a block queue. +pub struct Client { + chain: Arc, + _engine: Arc>, +} + +impl Client { + pub fn new(spec: Spec, path: &Path) -> Result { + let chain = Arc::new(BlockChain::new(&spec.genesis_block(), path)); + let engine = Arc::new(try!(spec.to_engine())); + Ok(Client { + chain: chain.clone(), + _engine: engine, + }) + } +} + +impl BlockChainClient for Client { + fn block_header(&self, hash: &H256) -> Option { + self.chain.block(hash).map(|bytes| BlockView::new(&bytes).rlp().at(0).as_raw().to_vec()) + } + + fn block_body(&self, hash: &H256) -> Option { + self.chain.block(hash).map(|bytes| { + let rlp = Rlp::new(&bytes); + let mut body = RlpStream::new(); + body.append_raw(rlp.at(1).as_raw(), 1); + body.append_raw(rlp.at(2).as_raw(), 1); + body.out() + }) + } + + fn block(&self, hash: &H256) -> Option { + self.chain.block(hash) + } + + fn block_status(&self, hash: &H256) -> BlockStatus { + if self.chain.is_known(&hash) { BlockStatus::InChain } else { BlockStatus::Unknown } + } + + fn block_header_at(&self, n: BlockNumber) -> Option { + self.chain.block_hash(n).and_then(|h| self.block_header(&h)) + } + + fn block_body_at(&self, n: BlockNumber) -> Option { + self.chain.block_hash(n).and_then(|h| self.block_body(&h)) + } + + fn block_at(&self, n: BlockNumber) -> Option { + self.chain.block_hash(n).and_then(|h| self.block(&h)) + } + + fn block_status_at(&self, n: BlockNumber) -> BlockStatus { + match self.chain.block_hash(n) { + Some(h) => self.block_status(&h), + None => BlockStatus::Unknown + } + } + + fn tree_route(&self, from: &H256, to: &H256) -> Option { + self.chain.tree_route(from.clone(), to.clone()) + } + + fn state_data(&self, _hash: &H256) -> Option { + unimplemented!(); + } + + fn block_receipts(&self, _hash: &H256) -> Option { + unimplemented!(); + } + + fn import_block(&mut self, bytes: &[u8]) -> ImportResult { + //TODO: verify block + { + let block = BlockView::new(bytes); + let header = block.header_view(); + let hash = header.sha3(); + if self.chain.is_known(&hash) { + return Err(ImportError::AlreadyInChain); + } + } + self.chain.insert_block(bytes); + Ok(()) + } + + fn queue_status(&self) -> BlockQueueStatus { + BlockQueueStatus { + full: false + } + } + + fn clear_queue(&mut self) { + } + + fn chain_info(&self) -> BlockChainInfo { + BlockChainInfo { + total_difficulty: self.chain.best_block_total_difficulty(), + pending_total_difficulty: self.chain.best_block_total_difficulty(), + genesis_hash: self.chain.genesis_hash(), + best_block_hash: self.chain.best_block_hash(), + best_block_number: From::from(self.chain.best_block_number()) + } + } +} diff --git a/src/engine.rs b/src/engine.rs index bfaf46348..64f85e079 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -4,7 +4,7 @@ use spec::Spec; /// A consensus mechanism for the chain. Generally either proof-of-work or proof-of-stake-based. /// Provides hooks into each of the major parts of block import. -pub trait Engine { +pub trait Engine : Sync + Send { /// The name of this engine. fn name(&self) -> &str; /// The version of this engine. Should be of the form diff --git a/src/env_info.rs b/src/env_info.rs index 2ec096083..47b247f3c 100644 --- a/src/env_info.rs +++ b/src/env_info.rs @@ -1,4 +1,5 @@ use util::*; +use header::BlockNumber; /// Simple vector of hashes, should be at most 256 items large, can be smaller if being used /// for a block whose number is less than 257. @@ -7,7 +8,7 @@ pub type LastHashes = Vec; /// Information concerning the execution environment for a message-call/contract-creation. pub struct EnvInfo { /// The block number. - pub number: usize, + pub number: BlockNumber, /// The block author. pub author: Address, /// The block timestamp. @@ -20,4 +21,4 @@ pub struct EnvInfo { pub last_hashes: LastHashes, /// The gas used. pub gas_used: U256, -} \ No newline at end of file +} diff --git a/src/error.rs b/src/error.rs index 68b75ab5b..d975f15b6 100644 --- a/src/error.rs +++ b/src/error.rs @@ -23,6 +23,13 @@ pub enum BlockError { InvalidSealArity(Mismatch), } +#[derive(Debug)] +pub enum ImportError { + Bad(BlockError), + AlreadyInChain, + AlreadyQueued, +} + #[derive(Debug)] /// General error type which should be capable of representing all errors in ethcore. pub enum Error { diff --git a/src/ethereum/mod.rs b/src/ethereum/mod.rs index ed1950d64..b3efe4a3d 100644 --- a/src/ethereum/mod.rs +++ b/src/ethereum/mod.rs @@ -52,7 +52,7 @@ mod tests { fn morden() { let morden = new_morden(); - assert_eq!(*morden.state_root(), H256::from_str("f3f4696bbf3b3b07775128eb7a3763279a394e382130f27c21e70233e04946a9").unwrap()); + assert_eq!(morden.state_root(), H256::from_str("f3f4696bbf3b3b07775128eb7a3763279a394e382130f27c21e70233e04946a9").unwrap()); let genesis = morden.genesis_block(); assert_eq!(BlockView::new(&genesis).header_view().sha3(), H256::from_str("0cd786a2425d16f152c658316c423e6ce1181e15c3295826d7c9904cba9ce303").unwrap()); @@ -63,10 +63,10 @@ mod tests { fn frontier() { let frontier = new_frontier(); - assert_eq!(*frontier.state_root(), H256::from_str("d7f8974fb5ac78d9ac099b9ad5018bedc2ce0a72dad1827a1709da30580f0544").unwrap()); + assert_eq!(frontier.state_root(), H256::from_str("d7f8974fb5ac78d9ac099b9ad5018bedc2ce0a72dad1827a1709da30580f0544").unwrap()); let genesis = frontier.genesis_block(); assert_eq!(BlockView::new(&genesis).header_view().sha3(), H256::from_str("d4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3").unwrap()); let _ = frontier.to_engine(); } -} \ No newline at end of file +} diff --git a/src/extras.rs b/src/extras.rs index e94aa809f..ed0032698 100644 --- a/src/extras.rs +++ b/src/extras.rs @@ -1,4 +1,5 @@ use util::*; +use header::BlockNumber; use rocksdb::{DB, Writable}; /// Represents index of extra data in database @@ -75,7 +76,7 @@ impl ExtrasSliceConvertable for U256 { } // NICE: make less horrible. -impl ExtrasSliceConvertable for usize { +impl ExtrasSliceConvertable for BlockNumber { fn to_extras_slice(&self, i: ExtrasIndex) -> H264 { U256::from(*self).to_extras_slice(i) } @@ -95,7 +96,7 @@ impl ExtrasIndexable for H256 { /// Familial details concerning a block #[derive(Debug, Clone)] pub struct BlockDetails { - pub number: usize, + pub number: BlockNumber, pub total_difficulty: U256, pub parent: H256, pub children: Vec diff --git a/src/header.rs b/src/header.rs index 74ef169c0..5a274bb5e 100644 --- a/src/header.rs +++ b/src/header.rs @@ -2,6 +2,8 @@ use util::*; use basic_types::*; use time::now_utc; +pub type BlockNumber = u64; + /// A block header. /// /// Reflects the specific RLP fields of a block in the chain with additional room for the seal @@ -13,7 +15,7 @@ pub struct Header { // TODO: make all private. pub parent_hash: H256, pub timestamp: u64, - pub number: usize, + pub number: BlockNumber, pub author: Address, pub transactions_root: H256, @@ -46,23 +48,23 @@ impl Header { number: 0, author: ZERO_ADDRESS.clone(), - transactions_root: ZERO_H256.clone(), - uncles_hash: ZERO_H256.clone(), + transactions_root: SHA3_NULL_RLP, + uncles_hash: SHA3_EMPTY_LIST_RLP, extra_data: vec![], - state_root: ZERO_H256.clone(), - receipts_root: ZERO_H256.clone(), + state_root: SHA3_NULL_RLP, + receipts_root: SHA3_NULL_RLP, log_bloom: ZERO_LOGBLOOM.clone(), - gas_used: ZERO_U256.clone(), - gas_limit: ZERO_U256.clone(), + gas_used: ZERO_U256, + gas_limit: ZERO_U256, - difficulty: ZERO_U256.clone(), + difficulty: ZERO_U256, seal: vec![], hash: RefCell::new(None), } } - pub fn number(&self) -> usize { self.number } + pub fn number(&self) -> BlockNumber { self.number } pub fn timestamp(&self) -> u64 { self.timestamp } pub fn author(&self) -> &Address { &self.author } @@ -72,7 +74,7 @@ impl Header { // TODO: seal_at, set_seal_at &c. - pub fn set_number(&mut self, a: usize) { self.number = a; self.note_dirty(); } + pub fn set_number(&mut self, a: BlockNumber) { self.number = a; self.note_dirty(); } pub fn set_timestamp(&mut self, a: u64) { self.timestamp = a; self.note_dirty(); } pub fn set_timestamp_now(&mut self) { self.timestamp = now_utc().to_timespec().sec as u64; self.note_dirty(); } pub fn set_author(&mut self, a: Address) { if a != self.author { self.author = a; self.note_dirty(); } } @@ -133,28 +135,28 @@ impl Header { impl Decodable for Header { fn decode(decoder: &D) -> Result where D: Decoder { - let d = try!(decoder.as_list()); + let r = decoder.as_rlp(); let mut blockheader = Header { - parent_hash: try!(Decodable::decode(&d[0])), - uncles_hash: try!(Decodable::decode(&d[1])), - author: try!(Decodable::decode(&d[2])), - state_root: try!(Decodable::decode(&d[3])), - transactions_root: try!(Decodable::decode(&d[4])), - receipts_root: try!(Decodable::decode(&d[5])), - log_bloom: try!(Decodable::decode(&d[6])), - difficulty: try!(Decodable::decode(&d[7])), - number: try!(Decodable::decode(&d[8])), - gas_limit: try!(Decodable::decode(&d[9])), - gas_used: try!(Decodable::decode(&d[10])), - timestamp: try!(Decodable::decode(&d[11])), - extra_data: try!(Decodable::decode(&d[12])), + parent_hash: try!(r.val_at(0)), + uncles_hash: try!(r.val_at(1)), + author: try!(r.val_at(2)), + state_root: try!(r.val_at(3)), + transactions_root: try!(r.val_at(4)), + receipts_root: try!(r.val_at(5)), + log_bloom: try!(r.val_at(6)), + difficulty: try!(r.val_at(7)), + number: try!(r.val_at(8)), + gas_limit: try!(r.val_at(9)), + gas_used: try!(r.val_at(10)), + timestamp: try!(r.val_at(11)), + extra_data: try!(r.val_at(12)), seal: vec![], - hash: RefCell::new(None), + hash: RefCell::new(Some(r.as_raw().sha3())) }; - for i in 13..d.len() { - blockheader.seal.push(d[i].as_raw().to_vec()); + for i in 13..r.item_count() { + blockheader.seal.push(try!(r.at(i)).as_raw().to_vec()) } Ok(blockheader) @@ -177,7 +179,7 @@ impl Encodable for Header { self.gas_used.encode(e); self.timestamp.encode(e); self.extra_data.encode(e); - + for b in self.seal.iter() { e.emit_raw(&b); } diff --git a/src/lib.rs b/src/lib.rs index c8695b89d..bcf112c10 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,7 @@ #![feature(cell_extras)] #![feature(augmented_assignments)] //! Ethcore's ethereum implementation -//! +//! //! ### Rust version //! - beta //! - nightly @@ -9,44 +9,44 @@ //! ### Supported platforms: //! - OSX //! - Linux/Ubuntu -//! +//! //! ### Dependencies: //! - RocksDB 3.13 //! - LLVM 3.7 (optional, required for `jit`) //! - evmjit (optional, required for `jit`) //! //! ### Dependencies Installation -//! +//! //! - OSX -//! +//! //! - rocksdb //! ```bash //! brew install rocksdb //! ``` -//! +//! //! - llvm -//! +//! //! - download llvm 3.7 from http://llvm.org/apt/ //! //! ```bash //! cd llvm-3.7.0.src //! mkdir build && cd $_ -//! cmake -G "Unix Makefiles" .. -DCMAKE_C_FLAGS_RELEASE= -DCMAKE_CXX_FLAGS_RELEASE= -DCMAKE_INSTALL_PREFIX=/usr/local/Cellar/llvm/3.7 -DCMAKE_BUILD_TYPE=Release +//! cmake -G "Unix Makefiles" .. -DCMAKE_C_FLAGS_RELEASE= -DCMAKE_CXX_FLAGS_RELEASE= -DCMAKE_INSTALL_PREFIX=/usr/local/Cellar/llvm/3.7 -DCMAKE_BUILD_TYPE=Release //! make && make install //! ``` //! - evmjit -//! +//! //! - download from https://github.com/debris/evmjit -//! +//! //! ```bash //! cd evmjit //! mkdir build && cd $_ //! cmake -DLLVM_DIR=/usr/local/lib/llvm-3.7/share/llvm/cmake .. //! make && make install //! ``` -//! +//! //! - Linux/Ubuntu -//! +//! //! - rocksdb //! //! ```bash @@ -54,15 +54,15 @@ //! tar xvf rocksdb-3.13.tar.gz && cd rocksdb-rocksdb-3.13 && make shared_lib //! sudo make install //! ``` -//! +//! //! - llvm -//! +//! //! - install using packages from http://llvm.org/apt/ -//! +//! //! - evmjit -//! +//! //! - download from https://github.com/debris/evmjit -//! +//! //! ```bash //! cd evmjit //! mkdir build && cd $_ @@ -102,6 +102,7 @@ pub mod spec; pub mod views; pub mod blockchain; pub mod extras; +pub mod client; +pub mod sync; pub mod block; - -pub mod ethereum; \ No newline at end of file +pub mod ethereum; diff --git a/src/queue.rs b/src/queue.rs new file mode 100644 index 000000000..ea212aaf1 --- /dev/null +++ b/src/queue.rs @@ -0,0 +1,35 @@ +use std::sync::Arc; +use util::*; +use blockchain::BlockChain; +use client::{QueueStatus, ImportResult}; +use views::{BlockView}; + +/// A queue of blocks. Sits between network or other I/O and the BlockChain. +/// Sorts them ready for blockchain insertion. +pub struct BlockQueue; + +impl BlockQueue { + /// Creates a new queue instance. + pub fn new() -> BlockQueue { + } + + /// Clear the queue and stop verification activity. + pub fn clear(&mut self) { + } + + /// Add a block to the queue. + pub fn import_block(&mut self, bytes: &[u8], bc: &mut BlockChain) -> ImportResult { + //TODO: verify block + { + let block = BlockView::new(bytes); + let header = block.header_view(); + let hash = header.sha3(); + if self.chain.is_known(&hash) { + return ImportResult::Bad; + } + } + bc.insert_block(bytes); + ImportResult::Queued(QueueStatus::Known) + } +} + diff --git a/src/spec.rs b/src/spec.rs index c106169fe..acdfd2ecf 100644 --- a/src/spec.rs +++ b/src/spec.rs @@ -40,6 +40,27 @@ fn json_to_rlp_map(json: &Json) -> HashMap { }) } +//TODO: add code and data +#[derive(Debug)] +/// Genesis account data. Does no thave a DB overlay cache +pub struct GenesisAccount { + // Balance of the account. + balance: U256, + // Nonce of the account. + nonce: U256, +} + +impl GenesisAccount { + pub fn rlp(&self) -> Bytes { + let mut stream = RlpStream::new_list(4); + stream.append(&self.nonce); + stream.append(&self.balance); + stream.append(&SHA3_NULL_RLP); + stream.append(&SHA3_EMPTY); + stream.out() + } +} + /// Parameters for a block chain; includes both those intrinsic to the design of the /// chain and those to be interpreted by the active chain engine. #[derive(Debug)] @@ -62,12 +83,12 @@ pub struct Spec { pub gas_used: U256, pub timestamp: u64, pub extra_data: Bytes, - pub genesis_state: HashMap, + pub genesis_state: HashMap, pub seal_fields: usize, pub seal_rlp: Bytes, // May be prepopulated if we know this in advance. - state_root_memo: RefCell>, + state_root_memo: RwLock>, } impl Spec { @@ -81,12 +102,12 @@ impl Spec { } } - /// Return the state root for the genesis state, memoising accordingly. - pub fn state_root(&self) -> Ref { - if self.state_root_memo.borrow().is_none() { - *self.state_root_memo.borrow_mut() = Some(sec_trie_root(self.genesis_state.iter().map(|(k, v)| (k.to_vec(), v.rlp())).collect())); + /// Return the state root for the genesis state, memoising accordingly. + pub fn state_root(&self) -> H256 { + if self.state_root_memo.read().unwrap().is_none() { + *self.state_root_memo.write().unwrap() = Some(sec_trie_root(self.genesis_state.iter().map(|(k, v)| (k.to_vec(), v.rlp())).collect())); } - Ref::map(self.state_root_memo.borrow(), |x|x.as_ref().unwrap()) + self.state_root_memo.read().unwrap().as_ref().unwrap().clone() } pub fn genesis_header(&self) -> Header { @@ -149,7 +170,7 @@ impl Spec { // let nonce = if let Some(&Json::String(ref n)) = acc.find("nonce") {U256::from_dec_str(n).unwrap_or(U256::from(0))} else {U256::from(0)}; // TODO: handle code & data if they exist. if balance.is_some() || nonce.is_some() { - state.insert(addr, Account::new_basic(balance.unwrap_or(U256::from(0)), nonce.unwrap_or(U256::from(0)))); + state.insert(addr, GenesisAccount { balance: balance.unwrap_or(U256::from(0)), nonce: nonce.unwrap_or(U256::from(0)) }); } } } @@ -186,7 +207,7 @@ impl Spec { genesis_state: state, seal_fields: seal_fields, seal_rlp: seal_rlp, - state_root_memo: RefCell::new(genesis.find("stateRoot").and_then(|_| genesis["stateRoot"].as_string()).map(|s| H256::from_str(&s[2..]).unwrap())), + state_root_memo: RwLock::new(genesis.find("stateRoot").and_then(|_| genesis["stateRoot"].as_string()).map(|s| H256::from_str(&s[2..]).unwrap())), } } @@ -228,10 +249,10 @@ mod tests { fn test_chain() { let test_spec = Spec::new_test(); - assert_eq!(*test_spec.state_root(), H256::from_str("f3f4696bbf3b3b07775128eb7a3763279a394e382130f27c21e70233e04946a9").unwrap()); + assert_eq!(test_spec.state_root(), H256::from_str("f3f4696bbf3b3b07775128eb7a3763279a394e382130f27c21e70233e04946a9").unwrap()); let genesis = test_spec.genesis_block(); assert_eq!(BlockView::new(&genesis).header_view().sha3(), H256::from_str("0cd786a2425d16f152c658316c423e6ce1181e15c3295826d7c9904cba9ce303").unwrap()); let _ = test_spec.to_engine(); } -} \ No newline at end of file +} diff --git a/src/sync/chain.rs b/src/sync/chain.rs new file mode 100644 index 000000000..ffa5d8add --- /dev/null +++ b/src/sync/chain.rs @@ -0,0 +1,970 @@ +/// +/// BlockChain synchronization strategy. +/// Syncs to peers and keeps up to date. +/// This implementation uses ethereum protocol v63 +/// +/// Syncing strategy. +/// +/// 1. A peer arrives with a total difficulty better than ours +/// 2. Find a common best block between our an peer chain. +/// Start with out best block and request headers from peer backwards until a common block is found +/// 3. Download headers and block bodies from peers in parallel. +/// As soon as a set of the blocks is fully downloaded at the head of the queue it is fed to the blockchain +/// 4. Maintain sync by handling NewBlocks/NewHashes messages +/// + +use util::*; +use std::mem::{replace}; +use views::{HeaderView}; +use header::{BlockNumber, Header as BlockHeader}; +use client::{BlockChainClient, BlockStatus}; +use sync::range_collection::{RangeCollection, ToUsize, FromUsize}; +use error::*; +use sync::io::SyncIo; + +impl ToUsize for BlockNumber { + fn to_usize(&self) -> usize { + *self as usize + } +} + +impl FromUsize for BlockNumber { + fn from_usize(s: usize) -> BlockNumber { + s as BlockNumber + } +} + +type PacketDecodeError = DecoderError; + +const PROTOCOL_VERSION: u8 = 63u8; +const MAX_BODIES_TO_SEND: usize = 256; +const MAX_HEADERS_TO_SEND: usize = 512; +const MAX_NODE_DATA_TO_SEND: usize = 1024; +const MAX_RECEIPTS_TO_SEND: usize = 1024; +const MAX_HEADERS_TO_REQUEST: usize = 512; +const MAX_BODIES_TO_REQUEST: usize = 256; + +const STATUS_PACKET: u8 = 0x00; +const NEW_BLOCK_HASHES_PACKET: u8 = 0x01; +const TRANSACTIONS_PACKET: u8 = 0x02; +const GET_BLOCK_HEADERS_PACKET: u8 = 0x03; +const BLOCK_HEADERS_PACKET: u8 = 0x04; +const GET_BLOCK_BODIES_PACKET: u8 = 0x05; +const BLOCK_BODIES_PACKET: u8 = 0x06; +const NEW_BLOCK_PACKET: u8 = 0x07; + +const GET_NODE_DATA_PACKET: u8 = 0x0d; +const NODE_DATA_PACKET: u8 = 0x0e; +const GET_RECEIPTS_PACKET: u8 = 0x0f; +const RECEIPTS_PACKET: u8 = 0x10; + +const NETWORK_ID: U256 = ONE_U256; //TODO: get this from parent + +struct Header { + /// Header data + data: Bytes, + /// Block hash + hash: H256, + /// Parent hash + parent: H256, +} + +/// Used to identify header by transactions and uncles hashes +#[derive(Eq, PartialEq, Hash)] +struct HeaderId { + transactions_root: H256, + uncles: H256 +} + +#[derive(Copy, Clone, Eq, PartialEq, Debug)] +/// Sync state +pub enum SyncState { + /// Initial chain sync has not started yet + NotSynced, + /// Initial chain sync complete. Waiting for new packets + Idle, + /// Block downloading paused. Waiting for block queue to process blocks and free some space + Waiting, + /// Downloading blocks + Blocks, + /// Downloading blocks learned from NewHashes packet + NewBlocks, +} + +/// Syncing status and statistics +pub struct SyncStatus { + /// State + pub state: SyncState, + /// Syncing protocol version. That's the maximum protocol version we connect to. + pub protocol_version: u8, + /// BlockChain height for the moment the sync started. + pub start_block_number: BlockNumber, + /// Last fully downloaded and imported block number. + pub last_imported_block_number: BlockNumber, + /// Highest block number in the download queue. + pub highest_block_number: BlockNumber, + /// Total number of blocks for the sync process. + pub blocks_total: usize, + /// Number of blocks downloaded so far. + pub blocks_received: usize, +} + +#[derive(PartialEq, Eq, Debug)] +/// Peer data type requested +enum PeerAsking { + Nothing, + BlockHeaders, + BlockBodies, +} + +/// Syncing peer information +struct PeerInfo { + /// eth protocol version + protocol_version: u32, + /// Peer chain genesis hash + genesis: H256, + /// Peer network id + network_id: U256, + /// Peer best block hash + latest: H256, + /// Peer total difficulty + difficulty: U256, + /// Type of data currenty being requested from peer. + asking: PeerAsking, + /// A set of block numbers being requested + asking_blocks: Vec, +} + +/// Blockchain sync handler. +/// See module documentation for more details. +pub struct ChainSync { + /// Sync state + state: SyncState, + /// Last block number for the start of sync + starting_block: BlockNumber, + /// Highest block number seen + highest_block: BlockNumber, + /// Set of block header numbers being downloaded + downloading_headers: HashSet, + /// Set of block body numbers being downloaded + downloading_bodies: HashSet, + /// Downloaded headers. + headers: Vec<(BlockNumber, Vec
)>, //TODO: use BTreeMap once range API is sable. For now it is a vector sorted in descending order + /// Downloaded bodies + bodies: Vec<(BlockNumber, Vec)>, //TODO: use BTreeMap once range API is sable. For now it is a vector sorted in descending order + /// Peer info + peers: HashMap, + /// Used to map body to header + header_ids: HashMap, + /// Last impoted block number + last_imported_block: BlockNumber, + /// Last impoted block hash + last_imported_hash: H256, + /// Syncing total difficulty + syncing_difficulty: U256, + /// True if common block for our and remote chain has been found + have_common_block: bool, +} + + +impl ChainSync { + /// Create a new instance of syncing strategy. + pub fn new() -> ChainSync { + ChainSync { + state: SyncState::NotSynced, + starting_block: 0, + highest_block: 0, + downloading_headers: HashSet::new(), + downloading_bodies: HashSet::new(), + headers: Vec::new(), + bodies: Vec::new(), + peers: HashMap::new(), + header_ids: HashMap::new(), + last_imported_block: 0, + last_imported_hash: H256::new(), + syncing_difficulty: U256::from(0u64), + have_common_block: false, + } + } + + /// @returns Synchonization status + pub fn status(&self) -> SyncStatus { + SyncStatus { + state: self.state.clone(), + protocol_version: 63, + start_block_number: self.starting_block, + last_imported_block_number: self.last_imported_block, + highest_block_number: self.highest_block, + blocks_total: (self.last_imported_block - self.starting_block) as usize, + blocks_received: (self.highest_block - self.starting_block) as usize, + } + } + + /// Abort all sync activity + pub fn abort(&mut self, io: &mut SyncIo) { + self.restart(io); + self.peers.clear(); + } + + /// Rest sync. Clear all downloaded data but keep the queue + fn reset(&mut self) { + self.downloading_headers.clear(); + self.downloading_bodies.clear(); + self.headers.clear(); + self.bodies.clear(); + for (_, ref mut p) in self.peers.iter_mut() { + p.asking_blocks.clear(); + } + self.header_ids.clear(); + self.syncing_difficulty = From::from(0u64); + self.state = SyncState::Idle; + } + + /// Restart sync + pub fn restart(&mut self, io: &mut SyncIo) { + self.reset(); + self.last_imported_block = 0; + self.last_imported_hash = H256::new(); + self.starting_block = 0; + self.highest_block = 0; + self.have_common_block = false; + io.chain().clear_queue(); + self.starting_block = io.chain().chain_info().best_block_number; + self.state = SyncState::NotSynced; + } + + /// Called by peer to report status + fn on_peer_status(&mut self, io: &mut SyncIo, peer_id: &PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + let peer = PeerInfo { + protocol_version: try!(r.val_at(0)), + network_id: try!(r.val_at(1)), + difficulty: try!(r.val_at(2)), + latest: try!(r.val_at(3)), + genesis: try!(r.val_at(4)), + asking: PeerAsking::Nothing, + asking_blocks: Vec::new(), + }; + + trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest, peer.genesis); + + let chain_info = io.chain().chain_info(); + if peer.genesis != chain_info.genesis_hash { + io.disable_peer(peer_id); + trace!(target: "sync", "Peer {} genesis hash not matched", peer_id); + return Ok(()); + } + if peer.network_id != NETWORK_ID { + io.disable_peer(peer_id); + trace!(target: "sync", "Peer {} network id not matched", peer_id); + return Ok(()); + } + + let old = self.peers.insert(peer_id.clone(), peer); + if old.is_some() { + panic!("ChainSync: new peer already exists"); + } + self.sync_peer(io, peer_id, false); + Ok(()) + } + + /// Called by peer once it has new block headers during sync + fn on_peer_block_headers(&mut self, io: &mut SyncIo, peer_id: &PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + self.reset_peer_asking(peer_id, PeerAsking::BlockHeaders); + let item_count = r.item_count(); + trace!(target: "sync", "{} -> BlockHeaders ({} entries)", peer_id, item_count); + self.clear_peer_download(peer_id); + if self.state != SyncState::Blocks && self.state != SyncState::NewBlocks && self.state != SyncState::Waiting { + trace!(target: "sync", "Ignored unexpected block headers"); + return Ok(()); + } + if self.state == SyncState::Waiting { + trace!(target: "sync", "Ignored block headers while waiting"); + return Ok(()); + } + + for i in 0..item_count { + let info: BlockHeader = try!(r.val_at(i)); + let number = BlockNumber::from(info.number); + if number <= self.last_imported_block || self.headers.have_item(&number) { + trace!(target: "sync", "Skipping existing block header"); + continue; + } + if number > self.highest_block { + self.highest_block = number; + } + let hash = info.hash(); + match io.chain().block_status(&hash) { + BlockStatus::InChain => { + self.have_common_block = true; + self.last_imported_block = number; + self.last_imported_hash = hash.clone(); + trace!(target: "sync", "Found common header {} ({})", number, hash); + }, + _ => { + if self.have_common_block { + //validate chain + if self.have_common_block && number == self.last_imported_block + 1 && info.parent_hash != self.last_imported_hash { + // TODO: lower peer rating + debug!(target: "sync", "Mismatched block header {} {}", number, hash); + continue; + } + if self.headers.find_item(&(number - 1)).map_or(false, |p| p.hash != info.parent_hash) { + // mismatching parent id, delete the previous block and don't add this one + // TODO: lower peer rating + debug!(target: "sync", "Mismatched block header {} {}", number, hash); + self.remove_downloaded_blocks(number - 1); + continue; + } + if self.headers.find_item(&(number + 1)).map_or(false, |p| p.parent != hash) { + // mismatching parent id for the next block, clear following headers + debug!(target: "sync", "Mismatched block header {}", number + 1); + self.remove_downloaded_blocks(number + 1); + } + } + let hdr = Header { + data: try!(r.at(i)).as_raw().to_vec(), + hash: hash.clone(), + parent: info.parent_hash, + }; + self.headers.insert_item(number, hdr); + let header_id = HeaderId { + transactions_root: info.transactions_root, + uncles: info.uncles_hash + }; + trace!(target: "sync", "Got header {} ({})", number, hash); + if header_id.transactions_root == rlp::SHA3_NULL_RLP && header_id.uncles == rlp::SHA3_EMPTY_LIST_RLP { + //empty body, just mark as downloaded + let mut body_stream = RlpStream::new_list(2); + body_stream.append_raw(&rlp::NULL_RLP, 1); + body_stream.append_raw(&rlp::EMPTY_LIST_RLP, 1); + self.bodies.insert_item(number, body_stream.out()); + } + else { + self.header_ids.insert(header_id, number); + } + } + } + } + self.collect_blocks(io); + self.continue_sync(io); + Ok(()) + } + + /// Called by peer once it has new block bodies + fn on_peer_block_bodies(&mut self, io: &mut SyncIo, peer_id: &PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + use util::triehash::ordered_trie_root; + self.reset_peer_asking(peer_id, PeerAsking::BlockBodies); + let item_count = r.item_count(); + trace!(target: "sync", "{} -> BlockBodies ({} entries)", peer_id, item_count); + self.clear_peer_download(peer_id); + if self.state != SyncState::Blocks && self.state != SyncState::NewBlocks && self.state != SyncState::Waiting { + trace!(target: "sync", "Ignored unexpected block bodies"); + return Ok(()); + } + if self.state == SyncState::Waiting { + trace!(target: "sync", "Ignored block bodies while waiting"); + return Ok(()); + } + for i in 0..item_count { + let body = try!(r.at(i)); + let tx = try!(body.at(0)); + let tx_root = ordered_trie_root(tx.iter().map(|r| r.as_raw().to_vec()).collect()); //TODO: get rid of vectors here + let uncles = try!(body.at(1)).as_raw().sha3(); + let header_id = HeaderId { + transactions_root: tx_root, + uncles: uncles + }; + match self.header_ids.get(&header_id).map(|n| *n) { + Some(n) => { + self.header_ids.remove(&header_id); + self.bodies.insert_item(n, body.as_raw().to_vec()); + trace!(target: "sync", "Got body {}", n); + } + None => { + debug!(target: "sync", "Ignored unknown block body"); + } + } + } + self.collect_blocks(io); + self.continue_sync(io); + Ok(()) + } + + /// Called by peer once it has new block bodies + fn on_peer_new_block(&mut self, io: &mut SyncIo, peer_id: &PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + let block_rlp = try!(r.at(0)); + let header_rlp = try!(block_rlp.at(0)); + let h = header_rlp.as_raw().sha3(); + + trace!(target: "sync", "{} -> NewBlock ({})", peer_id, h); + let header_view = HeaderView::new(header_rlp.as_raw()); + // TODO: Decompose block and add to self.headers and self.bodies instead + if header_view.number() == From::from(self.last_imported_block + 1) { + match io.chain().import_block(block_rlp.as_raw()) { + Err(ImportError::AlreadyInChain) => { + trace!(target: "sync", "New block already in chain {:?}", h); + }, + Err(ImportError::AlreadyQueued) => { + trace!(target: "sync", "New block already queued {:?}", h); + }, + Ok(()) => { + trace!(target: "sync", "New block queued {:?}", h); + }, + Err(e) => { + debug!(target: "sync", "Bad new block {:?} : {:?}", h, e); + io.disable_peer(peer_id); + } + }; + } + else { + trace!(target: "sync", "New block unknown {:?}", h); + //TODO: handle too many unknown blocks + let difficulty: U256 = try!(r.val_at(1)); + let peer_difficulty = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").difficulty; + if difficulty > peer_difficulty { + trace!(target: "sync", "Received block {:?} with no known parent. Peer needs syncing...", h); + self.sync_peer(io, peer_id, true); + } + } + Ok(()) + } + + /// Handles NewHashes packet. Initiates headers download for any unknown hashes. + fn on_peer_new_hashes(&mut self, io: &mut SyncIo, peer_id: &PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + if self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").asking != PeerAsking::Nothing { + trace!(target: "sync", "Ignoring new hashes since we're already downloading."); + return Ok(()); + } + trace!(target: "sync", "{} -> NewHashes ({} entries)", peer_id, r.item_count()); + let hashes = r.iter().map(|item| (item.val_at::(0), item.val_at::(1))); + let mut max_height: U256 = From::from(0); + for (rh, rd) in hashes { + let h = try!(rh); + let d = try!(rd); + match io.chain().block_status(&h) { + BlockStatus::InChain => { + trace!(target: "sync", "New block hash already in chain {:?}", h); + }, + BlockStatus::Queued => { + trace!(target: "sync", "New hash block already queued {:?}", h); + }, + BlockStatus::Unknown => { + trace!(target: "sync", "New unknown block hash {:?}", h); + if d > max_height { + let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + peer.latest = h.clone(); + max_height = d; + } + }, + BlockStatus::Bad =>{ + debug!(target: "sync", "Bad new block hash {:?}", h); + io.disable_peer(peer_id); + return Ok(()); + } + } + }; + Ok(()) + } + + /// Called by peer when it is disconnecting + pub fn on_peer_aborting(&mut self, io: &mut SyncIo, peer: &PeerId) { + trace!(target: "sync", "== Disconnected {}", peer); + if self.peers.contains_key(&peer) { + self.clear_peer_download(peer); + self.continue_sync(io); + } + } + + /// Called when a new peer is connected + pub fn on_peer_connected(&mut self, io: &mut SyncIo, peer: &PeerId) { + trace!(target: "sync", "== Connected {}", peer); + self.send_status(io, peer); + } + + /// Resume downloading + fn continue_sync(&mut self, io: &mut SyncIo) { + let mut peers: Vec<(PeerId, U256)> = self.peers.iter().map(|(k, p)| (*k, p.difficulty)).collect(); + peers.sort_by(|&(_, d1), &(_, d2)| d1.cmp(&d2).reverse()); //TODO: sort by rating + for (p, _) in peers { + self.sync_peer(io, &p, false); + } + } + + /// Called after all blocks have been donloaded + fn complete_sync(&mut self) { + trace!(target: "sync", "Sync complete"); + self.reset(); + self.state = SyncState::Idle; + } + + /// Enter waiting state + fn pause_sync(&mut self) { + trace!(target: "sync", "Block queue full, pausing sync"); + self.state = SyncState::Waiting; + } + + /// Find something to do for a peer. Called for a new peer or when a peer is done with it's task. + fn sync_peer(&mut self, io: &mut SyncIo, peer_id: &PeerId, force: bool) { + let (peer_latest, peer_difficulty) = { + let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + if peer.asking != PeerAsking::Nothing { + return; + } + if self.state == SyncState::Waiting { + trace!(target: "sync", "Waiting for block queue"); + return; + } + (peer.latest.clone(), peer.difficulty.clone()) + }; + + let td = io.chain().chain_info().pending_total_difficulty; + let syncing_difficulty = max(self.syncing_difficulty, td); + if force || peer_difficulty > syncing_difficulty { + // start sync + self.syncing_difficulty = peer_difficulty; + if self.state == SyncState::Idle || self.state == SyncState::NotSynced { + self.state = SyncState::Blocks; + } + trace!(target: "sync", "Starting sync with better chain"); + self.request_headers_by_hash(io, peer_id, &peer_latest, 1, 0, false); + } + else if self.state == SyncState::Blocks { + self.request_blocks(io, peer_id); + } + } + + /// Find some headers or blocks to download for a peer. + fn request_blocks(&mut self, io: &mut SyncIo, peer_id: &PeerId) { + self.clear_peer_download(peer_id); + + if io.chain().queue_status().full { + self.pause_sync(); + return; + } + + // check to see if we need to download any block bodies first + let mut needed_bodies: Vec = Vec::new(); + let mut needed_numbers: Vec = Vec::new(); + + if self.have_common_block && !self.headers.is_empty() && self.headers.range_iter().next().unwrap().0 == self.last_imported_block + 1 { + for (start, ref items) in self.headers.range_iter() { + if needed_bodies.len() > MAX_BODIES_TO_REQUEST { + break; + } + let mut index: BlockNumber = 0; + while index != items.len() as BlockNumber && needed_bodies.len() < MAX_BODIES_TO_REQUEST { + let block = start + index; + if !self.downloading_bodies.contains(&block) && !self.bodies.have_item(&block) { + needed_bodies.push(items[index as usize].hash.clone()); + needed_numbers.push(block); + self.downloading_bodies.insert(block); + } + index += 1; + } + } + } + if !needed_bodies.is_empty() { + replace(&mut self.peers.get_mut(peer_id).unwrap().asking_blocks, needed_numbers); + self.request_bodies(io, peer_id, needed_bodies); + } + else { + // check if need to download headers + let mut start = 0usize; + if !self.have_common_block { + // download backwards until common block is found 1 header at a time + let chain_info = io.chain().chain_info(); + start = chain_info.best_block_number as usize; + if !self.headers.is_empty() { + start = min(start, self.headers.range_iter().next().unwrap().0 as usize - 1); + } + if start == 0 { + self.have_common_block = true; //reached genesis + self.last_imported_hash = chain_info.genesis_hash; + } + } + if self.have_common_block { + let mut headers: Vec = Vec::new(); + let mut prev = self.last_imported_block + 1; + for (next, ref items) in self.headers.range_iter() { + if !headers.is_empty() { + break; + } + if next <= prev { + prev = next + items.len() as BlockNumber; + continue; + } + let mut block = prev; + while block < next && headers.len() <= MAX_HEADERS_TO_REQUEST { + if !self.downloading_headers.contains(&(block as BlockNumber)) { + headers.push(block as BlockNumber); + self.downloading_headers.insert(block as BlockNumber); + } + block += 1; + } + prev = next + items.len() as BlockNumber; + } + + if !headers.is_empty() { + start = headers[0] as usize; + let count = headers.len(); + replace(&mut self.peers.get_mut(peer_id).unwrap().asking_blocks, headers); + assert!(!self.headers.have_item(&(start as BlockNumber))); + self.request_headers_by_number(io, peer_id, start as BlockNumber, count, 0, false); + } + } + else { + self.request_headers_by_number(io, peer_id, start as BlockNumber, 1, 0, false); + } + } + } + + /// Clear all blocks/headers marked as being downloaded by a peer. + fn clear_peer_download(&mut self, peer_id: &PeerId) { + let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + for b in &peer.asking_blocks { + self.downloading_headers.remove(&b); + self.downloading_bodies.remove(&b); + } + peer.asking_blocks.clear(); + } + + /// Checks if there are blocks fully downloaded that can be imported into the blockchain and does the import. + fn collect_blocks(&mut self, io: &mut SyncIo) { + if !self.have_common_block || self.headers.is_empty() || self.bodies.is_empty() { + return; + } + + let mut restart = false; + // merge headers and bodies + { + let headers = self.headers.range_iter().next().unwrap(); + let bodies = self.bodies.range_iter().next().unwrap(); + if headers.0 != bodies.0 || headers.0 != self.last_imported_block + 1 { + return; + } + + let count = min(headers.1.len(), bodies.1.len()); + let mut imported = 0; + for i in 0..count { + let mut block_rlp = RlpStream::new_list(3); + block_rlp.append_raw(&headers.1[i].data, 1); + let body = Rlp::new(&bodies.1[i]); + block_rlp.append_raw(body.at(0).as_raw(), 1); + block_rlp.append_raw(body.at(1).as_raw(), 1); + let h = &headers.1[i].hash; + match io.chain().import_block(&block_rlp.out()) { + Err(ImportError::AlreadyInChain) => { + trace!(target: "sync", "Block already in chain {:?}", h); + self.last_imported_block = headers.0 + i as BlockNumber; + self.last_imported_hash = h.clone(); + }, + Err(ImportError::AlreadyQueued) => { + trace!(target: "sync", "Block already queued {:?}", h); + self.last_imported_block = headers.0 + i as BlockNumber; + self.last_imported_hash = h.clone(); + }, + Ok(()) => { + trace!(target: "sync", "Block queued {:?}", h); + self.last_imported_block = headers.0 + i as BlockNumber; + self.last_imported_hash = h.clone(); + imported += 1; + }, + Err(e) => { + debug!(target: "sync", "Bad block {:?} : {:?}", h, e); + restart = true; + } + } + } + trace!(target: "sync", "Imported {} of {}", imported, count); + } + + if restart { + self.restart(io); + return; + } + + self.headers.remove_head(&(self.last_imported_block + 1)); + self.bodies.remove_head(&(self.last_imported_block + 1)); + + if self.headers.is_empty() { + assert!(self.bodies.is_empty()); + self.complete_sync(); + } + } + + /// Remove downloaded bocks/headers starting from specified number. + /// Used to recover from an error and re-download parts of the chain detected as bad. + fn remove_downloaded_blocks(&mut self, start: BlockNumber) { + for n in self.headers.get_tail(&start) { + match self.headers.find_item(&n) { + Some(ref header_data) => { + let header_to_delete = HeaderView::new(&header_data.data); + let header_id = HeaderId { + transactions_root: header_to_delete.transactions_root(), + uncles: header_to_delete.uncles_hash() + }; + self.header_ids.remove(&header_id); + }, + None => {} + } + self.downloading_bodies.remove(&n); + self.downloading_headers.remove(&n); + } + self.headers.remove_tail(&start); + self.bodies.remove_tail(&start); + } + + /// Request headers from a peer by block hash + fn request_headers_by_hash(&mut self, sync: &mut SyncIo, peer_id: &PeerId, h: &H256, count: usize, skip: usize, reverse: bool) { + trace!(target: "sync", "{} <- GetBlockHeaders: {} entries starting from {}", peer_id, count, h); + let mut rlp = RlpStream::new_list(4); + rlp.append(h); + rlp.append(&count); + rlp.append(&skip); + rlp.append(&if reverse {1u32} else {0u32}); + self.send_request(sync, peer_id, PeerAsking::BlockHeaders, GET_BLOCK_HEADERS_PACKET, rlp.out()); + } + + /// Request headers from a peer by block number + fn request_headers_by_number(&mut self, sync: &mut SyncIo, peer_id: &PeerId, n: BlockNumber, count: usize, skip: usize, reverse: bool) { + let mut rlp = RlpStream::new_list(4); + trace!(target: "sync", "{} <- GetBlockHeaders: {} entries starting from {}", peer_id, count, n); + rlp.append(&n); + rlp.append(&count); + rlp.append(&skip); + rlp.append(&if reverse {1u32} else {0u32}); + self.send_request(sync, peer_id, PeerAsking::BlockHeaders, GET_BLOCK_HEADERS_PACKET, rlp.out()); + } + + /// Request block bodies from a peer + fn request_bodies(&mut self, sync: &mut SyncIo, peer_id: &PeerId, hashes: Vec) { + let mut rlp = RlpStream::new_list(hashes.len()); + trace!(target: "sync", "{} <- GetBlockBodies: {} entries", peer_id, hashes.len()); + for h in hashes { + rlp.append(&h); + } + self.send_request(sync, peer_id, PeerAsking::BlockBodies, GET_BLOCK_BODIES_PACKET, rlp.out()); + } + + /// Reset peer status after request is complete. + fn reset_peer_asking(&mut self, peer_id: &PeerId, asking: PeerAsking) { + let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + if peer.asking != asking { + warn!(target:"sync", "Asking {:?} while expected {:?}", peer.asking, asking); + } + else { + peer.asking = PeerAsking::Nothing; + } + } + + /// Generic request sender + fn send_request(&mut self, sync: &mut SyncIo, peer_id: &PeerId, asking: PeerAsking, packet_id: PacketId, packet: Bytes) { + { + let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + if peer.asking != PeerAsking::Nothing { + warn!(target:"sync", "Asking {:?} while requesting {:?}", asking, peer.asking); + } + } + match sync.send(*peer_id, packet_id, packet) { + Err(e) => { + warn!(target:"sync", "Error sending request: {:?}", e); + sync.disable_peer(peer_id); + self.on_peer_aborting(sync, peer_id); + } + Ok(_) => { + let mut peer = self.peers.get_mut(&peer_id).unwrap(); + peer.asking = asking; + } + } + } + + /// Called when peer sends us new transactions + fn on_peer_transactions(&mut self, _io: &mut SyncIo, _peer_id: &PeerId, _r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + Ok(()) + } + + /// Send Status message + fn send_status(&mut self, io: &mut SyncIo, peer_id: &PeerId) { + let mut packet = RlpStream::new_list(5); + let chain = io.chain().chain_info(); + packet.append(&(PROTOCOL_VERSION as u32)); + packet.append(&NETWORK_ID); //TODO: network id + packet.append(&chain.total_difficulty); + packet.append(&chain.best_block_hash); + packet.append(&chain.genesis_hash); + //TODO: handle timeout for status request + match io.send(*peer_id, STATUS_PACKET, packet.out()) { + Err(e) => { + warn!(target:"sync", "Error sending status request: {:?}", e); + io.disable_peer(peer_id); + } + Ok(_) => () + } + } + + /// Respond to GetBlockHeaders request + fn return_block_headers(&self, io: &mut SyncIo, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + // Packet layout: + // [ block: { P , B_32 }, maxHeaders: P, skip: P, reverse: P in { 0 , 1 } ] + let max_headers: usize = try!(r.val_at(1)); + let skip: usize = try!(r.val_at(2)); + let reverse: bool = try!(r.val_at(3)); + let last = io.chain().chain_info().best_block_number; + let mut number = if try!(r.at(0)).size() == 32 { + // id is a hash + let hash: H256 = try!(r.val_at(0)); + trace!(target: "sync", "-> GetBlockHeaders (hash: {}, max: {}, skip: {}, reverse:{})", hash, max_headers, skip, reverse); + match io.chain().block_header(&hash) { + Some(hdr) => From::from(HeaderView::new(&hdr).number()), + None => last + } + } + else { + trace!(target: "sync", "-> GetBlockHeaders (number: {}, max: {}, skip: {}, reverse:{})", try!(r.val_at::(0)), max_headers, skip, reverse); + try!(r.val_at(0)) + }; + + if reverse { + number = min(last, number); + } else { + number = max(1, number); + } + let max_count = min(MAX_HEADERS_TO_SEND, max_headers); + let mut count = 0; + let mut data = Bytes::new(); + let inc = (skip + 1) as BlockNumber; + while number <= last && number > 0 && count < max_count { + match io.chain().block_header_at(number) { + Some(mut hdr) => { + data.append(&mut hdr); + count += 1; + } + None => {} + } + if reverse { + if number <= inc { + break; + } + number -= inc; + } + else { + number += inc; + } + } + let mut rlp = RlpStream::new_list(count as usize); + rlp.append_raw(&data, count as usize); + io.respond(BLOCK_HEADERS_PACKET, rlp.out()).unwrap_or_else(|e| + debug!(target: "sync", "Error sending headers: {:?}", e)); + trace!(target: "sync", "-> GetBlockHeaders: returned {} entries", count); + Ok(()) + } + + /// Respond to GetBlockBodies request + fn return_block_bodies(&self, io: &mut SyncIo, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + let mut count = r.item_count(); + if count == 0 { + debug!(target: "sync", "Empty GetBlockBodies request, ignoring."); + return Ok(()); + } + trace!(target: "sync", "-> GetBlockBodies: {} entries", count); + count = min(count, MAX_BODIES_TO_SEND); + let mut added = 0usize; + let mut data = Bytes::new(); + for i in 0..count { + match io.chain().block_body(&try!(r.val_at::(i))) { + Some(mut hdr) => { + data.append(&mut hdr); + added += 1; + } + None => {} + } + } + let mut rlp = RlpStream::new_list(added); + rlp.append_raw(&data, added); + io.respond(BLOCK_BODIES_PACKET, rlp.out()).unwrap_or_else(|e| + debug!(target: "sync", "Error sending headers: {:?}", e)); + trace!(target: "sync", "-> GetBlockBodies: returned {} entries", added); + Ok(()) + } + + /// Respond to GetNodeData request + fn return_node_data(&self, io: &mut SyncIo, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + let mut count = r.item_count(); + if count == 0 { + debug!(target: "sync", "Empty GetNodeData request, ignoring."); + return Ok(()); + } + count = min(count, MAX_NODE_DATA_TO_SEND); + let mut added = 0usize; + let mut data = Bytes::new(); + for i in 0..count { + match io.chain().state_data(&try!(r.val_at::(i))) { + Some(mut hdr) => { + data.append(&mut hdr); + added += 1; + } + None => {} + } + } + let mut rlp = RlpStream::new_list(added); + rlp.append_raw(&data, added); + io.respond(NODE_DATA_PACKET, rlp.out()).unwrap_or_else(|e| + debug!(target: "sync", "Error sending headers: {:?}", e)); + Ok(()) + } + + /// Respond to GetReceipts request + fn return_receipts(&self, io: &mut SyncIo, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + let mut count = r.item_count(); + if count == 0 { + debug!(target: "sync", "Empty GetReceipts request, ignoring."); + return Ok(()); + } + count = min(count, MAX_RECEIPTS_TO_SEND); + let mut added = 0usize; + let mut data = Bytes::new(); + for i in 0..count { + match io.chain().block_receipts(&try!(r.val_at::(i))) { + Some(mut hdr) => { + data.append(&mut hdr); + added += 1; + } + None => {} + } + } + let mut rlp = RlpStream::new_list(added); + rlp.append_raw(&data, added); + io.respond(RECEIPTS_PACKET, rlp.out()).unwrap_or_else(|e| + debug!(target: "sync", "Error sending headers: {:?}", e)); + Ok(()) + } + + /// Dispatch incoming requests and responses + pub fn on_packet(&mut self, io: &mut SyncIo, peer: &PeerId, packet_id: u8, data: &[u8]) { + let rlp = UntrustedRlp::new(data); + let result = match packet_id { + STATUS_PACKET => self.on_peer_status(io, peer, &rlp), + TRANSACTIONS_PACKET => self.on_peer_transactions(io, peer, &rlp), + GET_BLOCK_HEADERS_PACKET => self.return_block_headers(io, &rlp), + BLOCK_HEADERS_PACKET => self.on_peer_block_headers(io, peer, &rlp), + GET_BLOCK_BODIES_PACKET => self.return_block_bodies(io, &rlp), + BLOCK_BODIES_PACKET => self.on_peer_block_bodies(io, peer, &rlp), + NEW_BLOCK_PACKET => self.on_peer_new_block(io, peer, &rlp), + NEW_BLOCK_HASHES_PACKET => self.on_peer_new_hashes(io, peer, &rlp), + GET_NODE_DATA_PACKET => self.return_node_data(io, &rlp), + GET_RECEIPTS_PACKET => self.return_receipts(io, &rlp), + _ => { + debug!(target: "sync", "Unknown packet {}", packet_id); + Ok(()) + } + }; + result.unwrap_or_else(|e| { + debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e); + }) + } + + /// Maintain other peers. Send out any new blocks and transactions + pub fn maintain_sync(&mut self, _io: &mut SyncIo) { + } +} + diff --git a/src/sync/io.rs b/src/sync/io.rs new file mode 100644 index 000000000..9806a3bf5 --- /dev/null +++ b/src/sync/io.rs @@ -0,0 +1,53 @@ +use client::BlockChainClient; +use util::network::{HandlerIo, PeerId, PacketId,}; +use util::error::UtilError; + +/// IO interface for the syning handler. +/// Provides peer connection management and an interface to the blockchain client. +// TODO: ratings +pub trait SyncIo { + /// Disable a peer + fn disable_peer(&mut self, peer_id: &PeerId); + /// Respond to current request with a packet. Can be called from an IO handler for incoming packet. + fn respond(&mut self, packet_id: PacketId, data: Vec) -> Result<(), UtilError>; + /// Send a packet to a peer. + fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), UtilError>; + /// Get the blockchain + fn chain<'s>(&'s mut self) -> &'s mut BlockChainClient; +} + +/// Wraps `HandlerIo` and the blockchain client +pub struct NetSyncIo<'s, 'h> where 'h:'s { + network: &'s mut HandlerIo<'h>, + chain: &'s mut BlockChainClient +} + +impl<'s, 'h> NetSyncIo<'s, 'h> { + /// Creates a new instance from the `HandlerIo` and the blockchain client reference. + pub fn new(network: &'s mut HandlerIo<'h>, chain: &'s mut BlockChainClient) -> NetSyncIo<'s,'h> { + NetSyncIo { + network: network, + chain: chain, + } + } +} + +impl<'s, 'h> SyncIo for NetSyncIo<'s, 'h> { + fn disable_peer(&mut self, peer_id: &PeerId) { + self.network.disable_peer(*peer_id); + } + + fn respond(&mut self, packet_id: PacketId, data: Vec) -> Result<(), UtilError>{ + self.network.respond(packet_id, data) + } + + fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), UtilError>{ + self.network.send(peer_id, packet_id, data) + } + + fn chain<'a>(&'a mut self) -> &'a mut BlockChainClient { + self.chain + } +} + + diff --git a/src/sync/mod.rs b/src/sync/mod.rs new file mode 100644 index 000000000..300465014 --- /dev/null +++ b/src/sync/mod.rs @@ -0,0 +1,100 @@ +/// Blockchain sync module +/// Implements ethereum protocol version 63 as specified here: +/// https://github.com/ethereum/wiki/wiki/Ethereum-Wire-Protocol +/// +/// Usage example: +/// +/// ```rust +/// extern crate ethcore_util as util; +/// extern crate ethcore; +/// use std::env; +/// use std::sync::Arc; +/// use util::network::NetworkService; +/// use ethcore::client::Client; +/// use ethcore::sync::EthSync; +/// use ethcore::ethereum; +/// +/// fn main() { +/// let mut service = NetworkService::start().unwrap(); +/// let dir = env::temp_dir(); +/// let client = Arc::new(Client::new(ethereum::new_frontier(), &dir).unwrap()); +/// EthSync::register(&mut service, client); +/// } +/// ``` + +use std::sync::Arc; +use client::BlockChainClient; +use util::network::{ProtocolHandler, NetworkService, HandlerIo, TimerToken, PeerId, Message}; +use sync::chain::ChainSync; +use sync::io::NetSyncIo; + +mod chain; +mod io; +mod range_collection; + +#[cfg(test)] +mod tests; + +/// Ethereum network protocol handler +pub struct EthSync { + /// Shared blockchain client. TODO: this should evetually become an IPC endpoint + chain: Arc, + /// Sync strategy + sync: ChainSync +} + +pub use self::chain::SyncStatus; + +impl EthSync { + /// Creates and register protocol with the network service + pub fn register(service: &mut NetworkService, chain: Arc) { + let sync = Box::new(EthSync { + chain: chain, + sync: ChainSync::new(), + }); + service.register_protocol(sync, "eth", &[62u8, 63u8]).expect("Error registering eth protocol handler"); + } + + /// Get sync status + pub fn status(&self) -> SyncStatus { + self.sync.status() + } + + /// Stop sync + pub fn stop(&mut self, io: &mut HandlerIo) { + self.sync.abort(&mut NetSyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap())); + } + + /// Restart sync + pub fn restart(&mut self, io: &mut HandlerIo) { + self.sync.restart(&mut NetSyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap())); + } +} + +impl ProtocolHandler for EthSync { + fn initialize(&mut self, io: &mut HandlerIo) { + self.sync.restart(&mut NetSyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap())); + io.register_timer(1000).unwrap(); + } + + fn read(&mut self, io: &mut HandlerIo, peer: &PeerId, packet_id: u8, data: &[u8]) { + self.sync.on_packet(&mut NetSyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap()), peer, packet_id, data); + } + + fn connected(&mut self, io: &mut HandlerIo, peer: &PeerId) { + self.sync.on_peer_connected(&mut NetSyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap()), peer); + } + + fn disconnected(&mut self, io: &mut HandlerIo, peer: &PeerId) { + self.sync.on_peer_aborting(&mut NetSyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap()), peer); + } + + fn timeout(&mut self, io: &mut HandlerIo, _timer: TimerToken) { + self.sync.maintain_sync(&mut NetSyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap())); + } + + fn message(&mut self, _io: &mut HandlerIo, _message: &Message) { + } +} + + diff --git a/src/sync/range_collection.rs b/src/sync/range_collection.rs new file mode 100644 index 000000000..d212625be --- /dev/null +++ b/src/sync/range_collection.rs @@ -0,0 +1,259 @@ +/// This module defines a trait for a collection of ranged values and an implementation +/// for this trait over sorted vector. + +use std::ops::{Add, Sub, Range}; + +pub trait ToUsize { + fn to_usize(&self) -> usize; +} + +pub trait FromUsize { + fn from_usize(s: usize) -> Self; +} + +/// A key-value collection orderd by key with sequential key-value pairs grouped together. +/// Such group is called a range. +/// E.g. a set of collection of 5 pairs {1, a}, {2, b}, {10, x}, {11, y}, {12, z} will be grouped into two ranges: {1, [a,b]}, {10, [x,y,z]} +pub trait RangeCollection { + /// Check if the given key is present in the collection. + fn have_item(&self, key: &K) -> bool; + /// Get value by key. + fn find_item(&self, key: &K) -> Option<&V>; + /// Get a range of keys from `key` till the end of the range that has `key` + /// Returns an empty range is key does not exist. + fn get_tail(&mut self, key: &K) -> Range; + /// Remove all elements < `start` in the range that contains `start` - 1 + fn remove_head(&mut self, start: &K); + /// Remove all elements >= `start` in the range that contains `start` + fn remove_tail(&mut self, start: &K); + /// Remove all elements >= `tail` + fn insert_item(&mut self, key: K, value: V); + /// Get an iterator over ranges + fn range_iter<'c>(&'c self) -> RangeIterator<'c, K, V>; +} + +/// Range iterator. For each range yelds a key for the first element of the range and a vector of values. +pub struct RangeIterator<'c, K:'c, V:'c> { + range: usize, + collection: &'c Vec<(K, Vec)> +} + +impl<'c, K:'c, V:'c> Iterator for RangeIterator<'c, K, V> where K: Add + FromUsize + ToUsize + Copy { + type Item = (K, &'c [V]); + // The 'Iterator' trait only requires the 'next' method to be defined. The + // return type is 'Option', 'None' is returned when the 'Iterator' is + // over, otherwise the next value is returned wrapped in 'Some' + fn next(&mut self) -> Option<(K, &'c [V])> { + if self.range > 0 { + self.range -= 1; + } + else { + return None; + } + match self.collection.get(self.range) { + Some(&(ref k, ref vec)) => { + Some((*k, &vec)) + }, + None => None + } + } +} + +impl RangeCollection for Vec<(K, Vec)> where K: Ord + PartialEq + Add + Sub + Copy + FromUsize + ToUsize { + fn range_iter<'c>(&'c self) -> RangeIterator<'c, K, V> { + RangeIterator { + range: self.len(), + collection: self + } + } + + fn have_item(&self, key: &K) -> bool { + match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { + Ok(_) => true, + Err(index) => match self.get(index) { + Some(&(ref k, ref v)) => k <= key && (*k + FromUsize::from_usize(v.len())) > *key, + _ => false + }, + } + } + + fn find_item(&self, key: &K) -> Option<&V> { + match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { + Ok(index) => self.get(index).unwrap().1.get(0), + Err(index) => match self.get(index) { + Some(&(ref k, ref v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => v.get((*key - *k).to_usize()), + _ => None + }, + } + } + + fn get_tail(&mut self, key: &K) -> Range { + let kv = *key; + match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { + Ok(index) => kv..(kv + FromUsize::from_usize(self[index].1.len())), + Err(index) => { + match self.get_mut(index) { + Some(&mut (ref k, ref mut v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => { + kv..(*k + FromUsize::from_usize(v.len())) + } + _ => kv..kv + } + }, + } + } + /// Remove element key and following elements in the same range + fn remove_tail(&mut self, key: &K) { + match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { + Ok(index) => { self.remove(index); }, + Err(index) =>{ + let mut empty = false; + match self.get_mut(index) { + Some(&mut (ref k, ref mut v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => { + v.truncate((*key - *k).to_usize()); + empty = v.is_empty(); + } + _ => {} + } + if empty { + self.remove(index); + } + }, + } + } + + /// Remove range elements up to key + fn remove_head(&mut self, key: &K) { + if *key == FromUsize::from_usize(0) { + return + } + + let prev = *key - FromUsize::from_usize(1); + match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { + Ok(_) => { }, //start of range, do nothing. + Err(index) => { + let mut empty = false; + match self.get_mut(index) { + Some(&mut (ref mut k, ref mut v)) if *k <= prev && (*k + FromUsize::from_usize(v.len())) > prev => { + let tail = v.split_off((*key - *k).to_usize()); + empty = tail.is_empty(); + let removed = ::std::mem::replace(v, tail); + let new_k = *k + FromUsize::from_usize(removed.len()); + ::std::mem::replace(k, new_k); + } + _ => {} + } + if empty { + self.remove(index); + } + }, + } + } + + fn insert_item(&mut self, key: K, value: V) { + assert!(!self.have_item(&key)); + + let lower = match self.binary_search_by(|&(k, _)| k.cmp(&key).reverse()) { + Ok(index) => index, + Err(index) => index, + }; + + let mut to_remove: Option = None; + if lower < self.len() && self[lower].0 + FromUsize::from_usize(self[lower].1.len()) == key { + // extend into existing chunk + self[lower].1.push(value); + } + else { + // insert a new chunk + let range: Vec = vec![value]; + self.insert(lower, (key, range)); + }; + if lower > 0 { + let next = lower - 1; + if next < self.len() + { + { + let (mut next, mut inserted) = self.split_at_mut(lower); + let mut next = next.last_mut().unwrap(); + let mut inserted = inserted.first_mut().unwrap(); + if next.0 == key + FromUsize::from_usize(1) + { + inserted.1.append(&mut next.1); + to_remove = Some(lower - 1); + } + } + + if let Some(r) = to_remove { + self.remove(r); + } + } + } + } +} + +#[test] +fn test_range() { + use std::cmp::{Ordering}; + + let mut ranges: Vec<(u64, Vec)> = Vec::new(); + assert_eq!(ranges.range_iter().next(), None); + assert_eq!(ranges.find_item(&1), None); + assert!(!ranges.have_item(&1)); + assert_eq!(ranges.get_tail(&0), 0..0); + + ranges.insert_item(17, 'q'); + assert_eq!(ranges.range_iter().cmp(vec![(17, &['q'][..])]), Ordering::Equal); + assert_eq!(ranges.find_item(&17), Some(&'q')); + assert!(ranges.have_item(&17)); + assert_eq!(ranges.get_tail(&17), 17..18); + + ranges.insert_item(18, 'r'); + assert_eq!(ranges.range_iter().cmp(vec![(17, &['q', 'r'][..])]), Ordering::Equal); + assert_eq!(ranges.find_item(&18), Some(&'r')); + assert!(ranges.have_item(&18)); + assert_eq!(ranges.get_tail(&17), 17..19); + + ranges.insert_item(16, 'p'); + assert_eq!(ranges.range_iter().cmp(vec![(16, &['p', 'q', 'r'][..])]), Ordering::Equal); + assert_eq!(ranges.find_item(&16), Some(&'p')); + assert_eq!(ranges.find_item(&17), Some(&'q')); + assert_eq!(ranges.find_item(&18), Some(&'r')); + assert!(ranges.have_item(&16)); + assert_eq!(ranges.get_tail(&17), 17..19); + assert_eq!(ranges.get_tail(&16), 16..19); + + ranges.insert_item(2, 'b'); + assert_eq!(ranges.range_iter().cmp(vec![(2, &['b'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); + assert_eq!(ranges.find_item(&2), Some(&'b')); + + ranges.insert_item(3, 'c'); + ranges.insert_item(4, 'd'); + assert_eq!(ranges.get_tail(&3), 3..5); + assert_eq!(ranges.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); + + let mut r = ranges.clone(); + r.remove_head(&1); + assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); + r.remove_head(&2); + assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); + r.remove_head(&3); + assert_eq!(r.range_iter().cmp(vec![(3, &['c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); + r.remove_head(&10); + assert_eq!(r.range_iter().cmp(vec![(3, &['c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); + r.remove_head(&5); + assert_eq!(r.range_iter().cmp(vec![(16, &['p', 'q', 'r'][..])]), Ordering::Equal); + r.remove_head(&19); + assert_eq!(r.range_iter().next(), None); + + let mut r = ranges.clone(); + r.remove_tail(&20); + assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); + r.remove_tail(&17); + assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p'][..])]), Ordering::Equal); + r.remove_tail(&16); + assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..])]), Ordering::Equal); + r.remove_tail(&3); + assert_eq!(r.range_iter().cmp(vec![(2, &['b'][..])]), Ordering::Equal); + r.remove_tail(&2); + assert_eq!(r.range_iter().next(), None); +} + diff --git a/src/sync/tests.rs b/src/sync/tests.rs new file mode 100644 index 000000000..6a045fa1e --- /dev/null +++ b/src/sync/tests.rs @@ -0,0 +1,343 @@ +use std::collections::{HashMap, VecDeque}; +use util::bytes::Bytes; +use util::hash::{H256, FixedHash}; +use util::uint::{U256}; +use util::sha3::Hashable; +use util::rlp::{self, Rlp, RlpStream, View, Stream}; +use util::network::{PeerId, PacketId}; +use util::error::UtilError; +use client::{BlockChainClient, BlockStatus, TreeRoute, BlockQueueStatus, BlockChainInfo, ImportResult}; +use header::{Header as BlockHeader, BlockNumber}; +use sync::io::SyncIo; +use sync::chain::ChainSync; + +struct TestBlockChainClient { + blocks: HashMap, + numbers: HashMap, + genesis_hash: H256, + last_hash: H256, + difficulty: U256 +} + +impl TestBlockChainClient { + fn new() -> TestBlockChainClient { + + let mut client = TestBlockChainClient { + blocks: HashMap::new(), + numbers: HashMap::new(), + genesis_hash: H256::new(), + last_hash: H256::new(), + difficulty: From::from(0), + }; + client.add_blocks(1, true); // add genesis block + client.genesis_hash = client.last_hash.clone(); + client + } + + pub fn add_blocks(&mut self, count: usize, empty: bool) { + for n in self.numbers.len()..(self.numbers.len() + count) { + let mut header = BlockHeader::new(); + header.difficulty = From::from(n); + header.parent_hash = self.last_hash.clone(); + header.number = n as BlockNumber; + let mut uncles = RlpStream::new_list(if empty {0} else {1}); + if !empty { + uncles.append(&H256::from(&U256::from(n))); + header.uncles_hash = uncles.as_raw().sha3(); + } + let mut rlp = RlpStream::new_list(3); + rlp.append(&header); + rlp.append_raw(&rlp::NULL_RLP, 1); + rlp.append_raw(uncles.as_raw(), 1); + self.import_block(rlp.as_raw()).unwrap(); + } + } +} + +impl BlockChainClient for TestBlockChainClient { + fn block_header(&self, h: &H256) -> Option { + self.blocks.get(h).map(|r| Rlp::new(r).at(0).as_raw().to_vec()) + + } + + fn block_body(&self, h: &H256) -> Option { + self.blocks.get(h).map(|r| { + let mut stream = RlpStream::new_list(2); + stream.append_raw(Rlp::new(&r).at(1).as_raw(), 1); + stream.append_raw(Rlp::new(&r).at(2).as_raw(), 1); + stream.out() + }) + } + + fn block(&self, h: &H256) -> Option { + self.blocks.get(h).map(|b| b.clone()) + } + + fn block_status(&self, h: &H256) -> BlockStatus { + match self.blocks.get(h) { + Some(_) => BlockStatus::InChain, + None => BlockStatus::Unknown + } + } + + fn block_header_at(&self, n: BlockNumber) -> Option { + self.numbers.get(&(n as usize)).and_then(|h| self.block_header(h)) + } + + fn block_body_at(&self, n: BlockNumber) -> Option { + self.numbers.get(&(n as usize)).and_then(|h| self.block_body(h)) + } + + fn block_at(&self, n: BlockNumber) -> Option { + self.numbers.get(&(n as usize)).map(|h| self.blocks.get(h).unwrap().clone()) + } + + fn block_status_at(&self, n: BlockNumber) -> BlockStatus { + if (n as usize) < self.blocks.len() { + BlockStatus::InChain + } else { + BlockStatus::Unknown + } + } + + fn tree_route(&self, _from: &H256, _to: &H256) -> Option { + Some(TreeRoute { + blocks: Vec::new(), + ancestor: H256::new(), + index: 0 + }) + } + + fn state_data(&self, _h: &H256) -> Option { + None + } + + fn block_receipts(&self, _h: &H256) -> Option { + None + } + + fn import_block(&mut self, b: &[u8]) -> ImportResult { + let header = Rlp::new(&b).val_at::(0); + let number: usize = header.number as usize; + if number > self.blocks.len() { + panic!("Unexpected block number. Expected {}, got {}", self.blocks.len(), number); + } + if number > 0 { + match self.blocks.get(&header.parent_hash) { + Some(parent) => { + let parent = Rlp::new(parent).val_at::(0); + if parent.number != (header.number - 1) { + panic!("Unexpected block parent"); + } + }, + None => { + panic!("Unknown block parent {:?} for block {}", header.parent_hash, number); + } + } + } + if number == self.numbers.len() { + self.difficulty = self.difficulty + header.difficulty; + self.last_hash = header.hash(); + self.blocks.insert(header.hash(), b.to_vec()); + self.numbers.insert(number, header.hash()); + let mut parent_hash = header.parent_hash; + if number > 0 { + let mut n = number - 1; + while n > 0 && self.numbers[&n] != parent_hash { + *self.numbers.get_mut(&n).unwrap() = parent_hash.clone(); + n -= 1; + parent_hash = Rlp::new(&self.blocks[&parent_hash]).val_at::(0).parent_hash; + } + } + } + else { + self.blocks.insert(header.hash(), b.to_vec()); + } + Ok(()) + } + + fn queue_status(&self) -> BlockQueueStatus { + BlockQueueStatus { + full: false, + } + } + + fn clear_queue(&mut self) { + } + + fn chain_info(&self) -> BlockChainInfo { + BlockChainInfo { + total_difficulty: self.difficulty, + pending_total_difficulty: self.difficulty, + genesis_hash: self.genesis_hash.clone(), + best_block_hash: self.last_hash.clone(), + best_block_number: self.blocks.len() as BlockNumber - 1, + } + } +} + +struct TestIo<'p> { + chain: &'p mut TestBlockChainClient, + queue: &'p mut VecDeque, + sender: Option, +} + +impl<'p> TestIo<'p> { + fn new(chain: &'p mut TestBlockChainClient, queue: &'p mut VecDeque, sender: Option) -> TestIo<'p> { + TestIo { + chain: chain, + queue: queue, + sender: sender + } + } +} + +impl<'p> SyncIo for TestIo<'p> { + fn disable_peer(&mut self, _peer_id: &PeerId) { + } + + fn respond(&mut self, packet_id: PacketId, data: Vec) -> Result<(), UtilError> { + self.queue.push_back(TestPacket { + data: data, + packet_id: packet_id, + recipient: self.sender.unwrap() + }); + Ok(()) + } + + fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), UtilError> { + self.queue.push_back(TestPacket { + data: data, + packet_id: packet_id, + recipient: peer_id, + }); + Ok(()) + } + + fn chain<'a>(&'a mut self) -> &'a mut BlockChainClient { + self.chain + } +} + +struct TestPacket { + data: Bytes, + packet_id: PacketId, + recipient: PeerId, +} + +struct TestPeer { + chain: TestBlockChainClient, + sync: ChainSync, + queue: VecDeque, +} + +struct TestNet { + peers: Vec +} + +impl TestNet { + pub fn new(n: usize) -> TestNet { + let mut net = TestNet { + peers: Vec::new(), + }; + for _ in 0..n { + net.peers.push(TestPeer { + chain: TestBlockChainClient::new(), + sync: ChainSync::new(), + queue: VecDeque::new(), + }); + } + net + } + + pub fn peer(&self, i: usize) -> &TestPeer { + self.peers.get(i).unwrap() + } + + pub fn peer_mut(&mut self, i: usize) -> &mut TestPeer { + self.peers.get_mut(i).unwrap() + } + + pub fn start(&mut self) { + for peer in 0..self.peers.len() { + for client in 0..self.peers.len() { + if peer != client { + let mut p = self.peers.get_mut(peer).unwrap(); + p.sync.on_peer_connected(&mut TestIo::new(&mut p.chain, &mut p.queue, Some(client as PeerId)), &(client as PeerId)); + } + } + } + } + + pub fn sync_step(&mut self) { + for peer in 0..self.peers.len() { + match self.peers[peer].queue.pop_front() { + Some(packet) => { + let mut p = self.peers.get_mut(packet.recipient).unwrap(); + trace!("--- {} -> {} ---", peer, packet.recipient); + p.sync.on_packet(&mut TestIo::new(&mut p.chain, &mut p.queue, Some(peer as PeerId)), &(peer as PeerId), packet.packet_id, &packet.data); + trace!("----------------"); + }, + None => {} + } + let mut p = self.peers.get_mut(peer).unwrap(); + p.sync.maintain_sync(&mut TestIo::new(&mut p.chain, &mut p.queue, None)); + } + } + + pub fn sync(&mut self) { + self.start(); + while !self.done() { + self.sync_step() + } + } + + pub fn done(&self) -> bool { + self.peers.iter().all(|p| p.queue.is_empty()) + } +} + + +#[test] +fn full_sync_two_peers() { + ::env_logger::init().ok(); + let mut net = TestNet::new(3); + net.peer_mut(1).chain.add_blocks(1000, false); + net.peer_mut(2).chain.add_blocks(1000, false); + net.sync(); + assert!(net.peer(0).chain.block_at(1000).is_some()); + assert_eq!(net.peer(0).chain.blocks, net.peer(1).chain.blocks); +} + +#[test] +fn full_sync_empty_blocks() { + ::env_logger::init().ok(); + let mut net = TestNet::new(3); + for n in 0..200 { + net.peer_mut(1).chain.add_blocks(5, n % 2 == 0); + net.peer_mut(2).chain.add_blocks(5, n % 2 == 0); + } + net.sync(); + assert!(net.peer(0).chain.block_at(1000).is_some()); + assert_eq!(net.peer(0).chain.blocks, net.peer(1).chain.blocks); +} + +#[test] +fn forked_sync() { + ::env_logger::init().ok(); + let mut net = TestNet::new(3); + net.peer_mut(0).chain.add_blocks(300, false); + net.peer_mut(1).chain.add_blocks(300, false); + net.peer_mut(2).chain.add_blocks(300, false); + net.peer_mut(0).chain.add_blocks(100, true); //fork + net.peer_mut(1).chain.add_blocks(200, false); + net.peer_mut(2).chain.add_blocks(200, false); + net.peer_mut(1).chain.add_blocks(100, false); //fork between 1 and 2 + net.peer_mut(2).chain.add_blocks(10, true); + // peer 1 has the best chain of 601 blocks + let peer1_chain = net.peer(1).chain.numbers.clone(); + net.sync(); + assert_eq!(net.peer(0).chain.numbers, peer1_chain); + assert_eq!(net.peer(1).chain.numbers, peer1_chain); + assert_eq!(net.peer(2).chain.numbers, peer1_chain); +} diff --git a/src/views.rs b/src/views.rs index 8e202d0b7..7a1a20e9f 100644 --- a/src/views.rs +++ b/src/views.rs @@ -24,18 +24,18 @@ impl<'a> BlockView<'a> { } /// Return reference to underlaying rlp. - pub fn rlp(&self) -> &Rlp<'a> { - &self.rlp + pub fn rlp(&self) -> &Rlp<'a> { + &self.rlp } /// Create new Header object from header rlp. - pub fn header(&self) -> Header { + pub fn header(&self) -> Header { self.rlp.val_at(0) } /// Create new header view obto block head rlp. - pub fn header_view(&self) -> HeaderView<'a> { - HeaderView::new_from_rlp(self.rlp.at(0)) + pub fn header_view(&self) -> HeaderView<'a> { + HeaderView::new_from_rlp(self.rlp.at(0)) } /// Return List of transactions in given block. @@ -44,7 +44,7 @@ impl<'a> BlockView<'a> { } /// Return transaction hashes. - pub fn transaction_hashes(&self) -> Vec { + pub fn transaction_hashes(&self) -> Vec { self.rlp.at(1).iter().map(|rlp| rlp.as_raw().sha3()).collect() } @@ -54,7 +54,7 @@ impl<'a> BlockView<'a> { } /// Return list of uncle hashes of given block. - pub fn uncle_hashes(&self) -> Vec { + pub fn uncle_hashes(&self) -> Vec { self.rlp.at(2).iter().map(|rlp| rlp.as_raw().sha3()).collect() } } @@ -105,7 +105,7 @@ impl<'a> HeaderView<'a> { /// Returns block receipts root. pub fn receipts_root(&self) -> H256 { self.rlp.val_at(5) } - + /// Returns block log bloom. pub fn log_bloom(&self) -> H2048 { self.rlp.val_at(6) } @@ -113,7 +113,7 @@ impl<'a> HeaderView<'a> { pub fn difficulty(&self) -> U256 { self.rlp.val_at(7) } /// Returns block number. - pub fn number(&self) -> usize { self.rlp.val_at(8) } + pub fn number(&self) -> BlockNumber { self.rlp.val_at(8) } /// Returns block gas limit. pub fn gas_limit(&self) -> U256 { self.rlp.val_at(9) } @@ -128,7 +128,7 @@ impl<'a> HeaderView<'a> { pub fn extra_data(&self) -> Bytes { self.rlp.val_at(12) } /// Returns block seal. - pub fn seal(&self) -> Vec { + pub fn seal(&self) -> Vec { let mut seal = vec![]; for i in 13..self.rlp.item_count() { seal.push(self.rlp.val_at(i));