From bf9667a206a10ce5d5172e0e411bf9cd50e99ee6 Mon Sep 17 00:00:00 2001 From: arkpar Date: Sat, 26 Dec 2015 15:47:07 +0100 Subject: [PATCH] sync tests --- src/blockchain.rs | 62 +++++------ src/eth.rs | 8 +- src/genesis.rs | 12 +- src/header.rs | 50 ++++++--- src/sync/chain.rs | 86 +++++++------- src/sync/mod.rs | 58 ++++++++-- src/sync/tests.rs | 278 +++++++++++++++++++++++++++++++++++++++++++++- 7 files changed, 436 insertions(+), 118 deletions(-) diff --git a/src/blockchain.rs b/src/blockchain.rs index df729787a..f64d81189 100644 --- a/src/blockchain.rs +++ b/src/blockchain.rs @@ -19,7 +19,7 @@ use transaction::*; use views::*; /// Represents a tree route between `from` block and `to` block: -/// +/// /// - `blocks` - a vector of hashes of all blocks, ordered from `from` to `to`. /// /// - `ancestor` - best common ancestor of these blocks. @@ -59,7 +59,7 @@ impl BestBlock { } /// Structure providing fast access to blockchain data. -/// +/// /// **Does not do input data verification.** pub struct BlockChain { best_block: RefCell, @@ -80,7 +80,7 @@ pub struct BlockChain { impl BlockChain { /// Create new instance of blockchain from given Genesis - /// + /// /// ```rust /// extern crate ethcore_util as util; /// extern crate ethcore; @@ -90,7 +90,7 @@ impl BlockChain { /// use ethcore::blockchain::*; /// use util::hash::*; /// use util::uint::*; - /// + /// /// fn main() { /// let genesis = Genesis::new_frontier(); /// @@ -152,7 +152,7 @@ impl BlockChain { batch.put_extras(&header.number(), &hash); batch.put(b"best", &hash).unwrap(); bc.extras_db.write(batch).unwrap(); - + hash } }; @@ -168,44 +168,44 @@ impl BlockChain { } /// Returns a tree route between `from` and `to`, which is a tuple of: - /// + /// /// - a vector of hashes of all blocks, ordered from `from` to `to`. /// /// - common ancestor of these blocks. /// /// - an index where best common ancestor would be - /// + /// /// 1.) from newer to older - /// + /// /// - bc: `A1 -> A2 -> A3 -> A4 -> A5` /// - from: A5, to: A4 - /// - route: + /// - route: /// /// ```json /// { blocks: [A5], ancestor: A4, index: 1 } /// ``` - /// + /// /// 2.) from older to newer - /// + /// /// - bc: `A1 -> A2 -> A3 -> A4 -> A5` /// - from: A3, to: A4 - /// - route: - /// + /// - route: + /// /// ```json /// { blocks: [A4], ancestor: A3, index: 0 } /// ``` /// /// 3.) fork: /// - /// - bc: + /// - bc: /// /// ```text /// A1 -> A2 -> A3 -> A4 /// -> B3 -> B4 - /// ``` + /// ``` /// - from: B4, to: A4 - /// - route: - /// + /// - route: + /// /// ```json /// { blocks: [B4, B3, A3, A4], ancestor: A2, index: 2 } /// ``` @@ -302,7 +302,7 @@ impl BlockChain { // create views onto rlp let block = BlockView::new(bytes); let header = block.header_view(); - + // prepare variables let hash = block.sha3(); let mut parent_details = self.block_details(&header.parent_hash()).expect("Invalid parent hash."); @@ -317,7 +317,7 @@ impl BlockChain { parent: parent_hash.clone(), children: vec![] }; - + // prepare the batch let batch = WriteBatch::new(); @@ -333,7 +333,7 @@ impl BlockChain { return (batch, None); } - // if its new best block we need to make sure that all ancestors + // if its new best block we need to make sure that all ancestors // are moved to "canon chain" // find the route between old best block and the new one let best_hash = self.best_block_hash(); @@ -368,7 +368,7 @@ impl BlockChain { (batch, Some(best_block)) } - /// Returns true if the given block is known + /// Returns true if the given block is known /// (though not necessarily a part of the canon chain). pub fn is_known(&self, hash: &H256) -> bool { self.query_extras_exist(hash, &self.block_details) @@ -471,8 +471,8 @@ impl BlockChain { } } - fn query_extras(&self, hash: &K, cache: &RefCell>) -> Option where - T: Clone + Decodable + ExtrasIndexable, + fn query_extras(&self, hash: &K, cache: &RefCell>) -> Option where + T: Clone + Decodable + ExtrasIndexable, K: ExtrasSliceConvertable + Eq + Hash + Clone { { let read = cache.borrow(); @@ -489,7 +489,7 @@ impl BlockChain { }) } - fn query_extras_exist(&self, hash: &K, cache: &RefCell>) -> bool where + fn query_extras_exist(&self, hash: &K, cache: &RefCell>) -> bool where K: ExtrasSliceConvertable + Eq + Hash + Clone, T: ExtrasIndexable { { @@ -541,7 +541,7 @@ mod tests { dir.push(H32::random().hex()); let bc = BlockChain::new(&genesis, &dir); - + let genesis_hash = H256::from_str("3caa2203f3d7c136c0295ed128a7d31cea520b1ca5e27afe17d0853331798942").unwrap(); assert_eq!(bc.genesis_hash(), genesis_hash.clone()); @@ -549,7 +549,7 @@ mod tests { assert_eq!(bc.best_block_hash(), genesis_hash.clone()); assert_eq!(bc.block_hash(&U256::from(0u8)), Some(genesis_hash.clone())); assert_eq!(bc.block_hash(&U256::from(1u8)), None); - + let first = "f90285f90219a03caa2203f3d7c136c0295ed128a7d31cea520b1ca5e27afe17d0853331798942a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347948888f1f195afa192cfee860698584c030f4c9db1a0bac6177a79e910c98d86ec31a09ae37ac2de15b754fd7bed1ba52362c49416bfa0d45893a296c1490a978e0bd321b5f2635d8280365c1fe9f693d65f233e791344a0c7778a7376099ee2e5c455791c1885b5c361b95713fddcbe32d97fd01334d296b90100000000000000000010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000200000000000000000008000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000040000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000200000000000400000000000000000000000000000000000000000000000000000008302000001832fefd882560b845627cb99a00102030405060708091011121314151617181920212223242526272829303132a08ccb2837fb2923bd97e8f2d08ea32012d6e34be018c73e49a0f98843e8f47d5d88e53be49fec01012ef866f864800a82c35094095e7baea6a6c7c4c2dfeb977efac326af552d8785012a05f200801ba0cb088b8d2ff76a7b2c6616c9d02fb6b7a501afbf8b69d7180b09928a1b80b5e4a06448fe7476c606582039bb72a9f6f4b4fad18507b8dfbd00eebbe151cc573cd2c0".from_hex().unwrap(); @@ -610,22 +610,22 @@ mod tests { assert_eq!(r0_1.blocks, [b1_hash.clone()]); assert_eq!(r0_1.index, 0); - let r0_2 = bc.tree_route(genesis_hash.clone(), b2_hash.clone()); + let r0_2 = bc.tree_route(genesis_hash.clone(), b2_hash.clone()); assert_eq!(r0_2.ancestor, genesis_hash); assert_eq!(r0_2.blocks, [b1_hash.clone(), b2_hash.clone()]); assert_eq!(r0_2.index, 0); - let r1_3a = bc.tree_route(b1_hash.clone(), b3a_hash.clone()); + let r1_3a = bc.tree_route(b1_hash.clone(), b3a_hash.clone()); assert_eq!(r1_3a.ancestor, b1_hash); assert_eq!(r1_3a.blocks, [b2_hash.clone(), b3a_hash.clone()]); assert_eq!(r1_3a.index, 0); - let r1_3b = bc.tree_route(b1_hash.clone(), b3b_hash.clone()); + let r1_3b = bc.tree_route(b1_hash.clone(), b3b_hash.clone()); assert_eq!(r1_3b.ancestor, b1_hash); assert_eq!(r1_3b.blocks, [b2_hash.clone(), b3b_hash.clone()]); assert_eq!(r1_3b.index, 0); - let r3a_3b = bc.tree_route(b3a_hash.clone(), b3b_hash.clone()); + let r3a_3b = bc.tree_route(b3a_hash.clone(), b3b_hash.clone()); assert_eq!(r3a_3b.ancestor, b2_hash); assert_eq!(r3a_3b.blocks, [b3a_hash.clone(), b3b_hash.clone()]); assert_eq!(r3a_3b.index, 1); @@ -639,7 +639,7 @@ mod tests { assert_eq!(r2_0.ancestor, genesis_hash); assert_eq!(r2_0.blocks, [b2_hash.clone(), b1_hash.clone()]); assert_eq!(r2_0.index, 2); - + let r3a_1 = bc.tree_route(b3a_hash.clone(), b1_hash.clone()); assert_eq!(r3a_1.ancestor, b1_hash); assert_eq!(r3a_1.blocks, [b3a_hash.clone(), b2_hash.clone()]); diff --git a/src/eth.rs b/src/eth.rs index e82a899a0..5eb7e9b6c 100644 --- a/src/eth.rs +++ b/src/eth.rs @@ -35,7 +35,7 @@ pub struct BlockQueueStatus { pub full: bool, } -pub struct TreeRoute; +pub type TreeRoute = ::blockchain::TreeRoute; pub type BlockNumber = u32; pub type BlockHeader = ::header::Header; @@ -46,9 +46,9 @@ pub trait BlockChainClient : Sync { fn block(&self, h: &H256) -> Option; fn block_status(&self, h: &H256) -> BlockStatus; fn block_header_at(&self, n: BlockNumber) -> Option; - fn block_body_at(&self, h: BlockNumber) -> Option; - fn block_at(&self, h: BlockNumber) -> Option; - fn block_status_at(&self, h: BlockNumber) -> BlockStatus; + fn block_body_at(&self, n: BlockNumber) -> Option; + fn block_at(&self, n: BlockNumber) -> Option; + fn block_status_at(&self, n: BlockNumber) -> BlockStatus; fn tree_route(&self, from: &H256, to: &H256) -> TreeRoute; fn state_data(&self, h: &H256) -> Option; fn block_receipts(&self, h: &H256) -> Option; diff --git a/src/genesis.rs b/src/genesis.rs index d4829a70e..3e95c098a 100644 --- a/src/genesis.rs +++ b/src/genesis.rs @@ -1,5 +1,6 @@ use std::io::Read; use std::str::FromStr; +use std::cell::Cell; use std::collections::HashMap; use rustc_serialize::base64::FromBase64; use rustc_serialize::json::Json; @@ -48,7 +49,7 @@ impl Genesis { Genesis { block: stream.out(), - state: state + state: state } } @@ -56,12 +57,12 @@ impl Genesis { fn load_genesis_json(json: &Json, state_root: &H256) -> (Header, HashMap) { // once we commit ourselves to some json parsing library (serde?) // move it to proper data structure - + let empty_list = RlpStream::new_list(0).out(); let empty_list_sha3 = empty_list.sha3(); let empty_data = encode(&""); let empty_data_sha3 = empty_data.sha3(); - + let header = Header { parent_hash: H256::from_str(&json["parentHash"].as_string().unwrap()[2..]).unwrap(), uncles_hash: empty_list_sha3.clone(), @@ -81,9 +82,10 @@ impl Genesis { let mixhash = H256::from_str(&json["mixhash"].as_string().unwrap()[2..]).unwrap(); let nonce = H64::from_str(&json["nonce"].as_string().unwrap()[2..]).unwrap(); vec![mixhash.to_vec(), nonce.to_vec()] - } + }, + hash: Cell::new(None) }; - + let mut state = HashMap::new(); let accounts = json["alloc"].as_object().expect("Missing genesis state"); for (address, acc) in accounts.iter() { diff --git a/src/header.rs b/src/header.rs index e161ac280..fdc51a549 100644 --- a/src/header.rs +++ b/src/header.rs @@ -1,4 +1,6 @@ +use std::cell::Cell; use util::hash::*; +use util::sha3::*; use util::bytes::*; use util::uint::*; use util::rlp::*; @@ -28,6 +30,8 @@ pub struct Header { pub difficulty: U256, pub seal: Vec, + + pub hash: Cell>, //TODO: make this private } impl Header { @@ -50,37 +54,49 @@ impl Header { difficulty: ZERO_U256.clone(), seal: vec![], + hash: Cell::new(None), } } pub fn hash(&self) -> H256 { - unimplemented!(); + let hash = self.hash.get(); + match hash { + Some(h) => h, + None => { + let mut stream = RlpStream::new(); + stream.append(self); + let h = stream.raw().sha3(); + self.hash.set(Some(h.clone())); + h + } + } } } impl Decodable for Header { fn decode(decoder: &D) -> Result where D: Decoder { - let d = try!(decoder.as_list()); + let r = decoder.as_rlp(); let mut blockheader = Header { - parent_hash: try!(Decodable::decode(&d[0])), - uncles_hash: try!(Decodable::decode(&d[1])), - author: try!(Decodable::decode(&d[2])), - state_root: try!(Decodable::decode(&d[3])), - transactions_root: try!(Decodable::decode(&d[4])), - receipts_root: try!(Decodable::decode(&d[5])), - log_bloom: try!(Decodable::decode(&d[6])), - difficulty: try!(Decodable::decode(&d[7])), - number: try!(Decodable::decode(&d[8])), - gas_limit: try!(Decodable::decode(&d[9])), - gas_used: try!(Decodable::decode(&d[10])), - timestamp: try!(Decodable::decode(&d[11])), - extra_data: try!(Decodable::decode(&d[12])), + parent_hash: try!(r.val_at(0)), + uncles_hash: try!(r.val_at(1)), + author: try!(r.val_at(2)), + state_root: try!(r.val_at(3)), + transactions_root: try!(r.val_at(4)), + receipts_root: try!(r.val_at(5)), + log_bloom: try!(r.val_at(6)), + difficulty: try!(r.val_at(7)), + number: try!(r.val_at(8)), + gas_limit: try!(r.val_at(9)), + gas_used: try!(r.val_at(10)), + timestamp: try!(r.val_at(11)), + extra_data: try!(r.val_at(12)), seal: vec![], + hash: Cell::new(Some(r.raw().sha3())) }; - for i in 13..d.len() { - blockheader.seal.push(try!(Decodable::decode(&d[i]))); + for i in 13..r.item_count() { + blockheader.seal.push(try!(r.val_at(i))) } Ok(blockheader) diff --git a/src/sync/chain.rs b/src/sync/chain.rs index 7f6b99ccb..0b921f552 100644 --- a/src/sync/chain.rs +++ b/src/sync/chain.rs @@ -1,7 +1,7 @@ use std::collections::{HashSet, HashMap}; use std::cmp::{min, max}; use std::mem::{replace}; -use util::network::{PeerId, HandlerIo, PacketId}; +use util::network::{PeerId, PacketId}; use util::hash::{H256}; use util::bytes::{Bytes}; use util::uint::{U256}; @@ -10,23 +10,7 @@ use util::rlp::rlptraits::{Stream, View}; use util::sha3::Hashable; use eth::{BlockNumber, BlockChainClient, BlockHeader, BlockStatus, QueueStatus, ImportResult}; use sync::range_collection::{RangeCollection, ToUsize, FromUsize}; - -pub struct SyncIo<'s, 'h> where 'h:'s { - network: &'s mut HandlerIo<'h>, - chain: &'s mut BlockChainClient -} - -impl<'s, 'h> SyncIo<'s, 'h> { - pub fn new(network: &'s mut HandlerIo<'h>, chain: &'s mut BlockChainClient) -> SyncIo<'s,'h> { - SyncIo { - network: network, - chain: chain, - } - } - fn disable_peer(&mut self, peer_id: &PeerId) { - self.network.disable_peer(*peer_id); - } -} +use sync::{SyncIo}; impl ToUsize for BlockNumber { fn to_usize(&self) -> usize { @@ -106,7 +90,6 @@ pub struct SyncStatus { enum PeerAsking { Nothing, - State, BlockHeaders, BlockBodies, } @@ -213,8 +196,8 @@ impl ChainSync { self.starting_block = 0; self.highest_block = 0; self.have_common_block = false; - io.chain.clear_queue(); - self.starting_block = io.chain.info().last_block_number; + io.chain().clear_queue(); + self.starting_block = io.chain().info().last_block_number; self.state = SyncState::NotSynced; } @@ -263,7 +246,7 @@ impl ChainSync { if number > self.highest_block { self.highest_block = number; } - match io.chain.block_status(&info.hash()) { + match io.chain().block_status(&info.hash()) { BlockStatus::InChain => { self.have_common_block = true; self.last_imported_block = number; @@ -285,7 +268,7 @@ impl ChainSync { } } let hdr = Header { - data: r.at(i).data().to_vec(), + data: r.at(i).raw().to_vec(), hash: info.hash(), parent: info.parent_hash, }; @@ -298,7 +281,7 @@ impl ChainSync { //empty body, just mark as downloaded let mut body_stream = RlpStream::new_list(2); body_stream.append_raw(&rlp::EMPTY_LIST_RLP, 1); - body_stream.append_raw(&rlp::EMPTY_LIST_RLP, 1); + body_stream.append_raw(&rlp::NULL_RLP, 1); self.bodies.insert_item(number, body_stream.out()); } else { @@ -327,8 +310,8 @@ impl ChainSync { for i in 0..item_count { let body: Rlp = r.at(i); let tx = body.at(0); - let tx_root = ::util::triehash::ordered_trie_root(tx.iter().map(|r| r.data().to_vec()).collect()); //TODO: get rid of vectors here - let uncles = body.at(1).data().sha3(); + let tx_root = ::util::triehash::ordered_trie_root(tx.iter().map(|r| r.raw().to_vec()).collect()); //TODO: get rid of vectors here + let uncles = body.at(1).raw().sha3(); let header_id = HeaderId { transactions_root: tx_root, uncles: uncles @@ -336,7 +319,7 @@ impl ChainSync { match self.header_ids.get(&header_id).map(|n| *n) { Some(n) => { self.header_ids.remove(&header_id); - self.bodies.insert_item(n, body.data().to_vec()); + self.bodies.insert_item(n, body.raw().to_vec()); } None => { debug!(target: "sync", "Ignored unknown block body"); @@ -351,9 +334,9 @@ impl ChainSync { fn on_peer_new_block(&mut self, io: &mut SyncIo, peer_id: &PeerId, r: &Rlp) { let block_rlp = r.at(0); let header_rlp = block_rlp.at(0); - let h = header_rlp.data().sha3(); + let h = header_rlp.raw().sha3(); - match io.chain.import_block(block_rlp.data()) { + match io.chain().import_block(block_rlp.raw()) { ImportResult::AlreadyInChain => { trace!(target: "sync", "New block already in chain {:?}", h); }, @@ -388,7 +371,7 @@ impl ChainSync { let hashes = r.iter().map(|item| (item.val_at::(0), item.val_at::(1))); let mut max_height: U256 = From::from(0); for (h, d) in hashes { - match io.chain.block_status(&h) { + match io.chain().block_status(&h) { BlockStatus::InChain => { trace!(target: "sync", "New block hash already in chain {:?}", h); }, @@ -458,7 +441,7 @@ impl ChainSync { (peer.latest.clone(), peer.difficulty.clone()) }; - let td = io.chain.info().pending_total_difficulty; + let td = io.chain().info().pending_total_difficulty; let syncing_difficulty = max(self.syncing_difficulty, td); if force || peer_difficulty > syncing_difficulty { // start sync @@ -476,7 +459,7 @@ impl ChainSync { fn request_blocks(&mut self, io: &mut SyncIo, peer_id: &PeerId) { self.clear_peer_download(peer_id); - if io.chain.queue_status().full { + if io.chain().queue_status().full { self.pause_sync(); return; } @@ -511,7 +494,7 @@ impl ChainSync { let mut start = 0usize; if !self.have_common_block { // download backwards until common block is found 1 header at a time - start = io.chain.info().last_block_number as usize; + start = io.chain().info().last_block_number as usize; if !self.headers.is_empty() { start = min(start, self.headers.range_iter().next().unwrap().0 as usize - 1); } @@ -585,7 +568,7 @@ impl ChainSync { block_rlp.append_raw(&headers.1[i].data, 1); block_rlp.append_raw(&bodies.1[i], 2); let h = &headers.1[i].hash; - match io.chain.import_block(&block_rlp.out()) { + match io.chain().import_block(&block_rlp.out()) { ImportResult::AlreadyInChain => { trace!(target: "sync", "Block already in chain {:?}", h); self.last_imported_block = headers.0 + i as BlockNumber; @@ -676,7 +659,7 @@ impl ChainSync { warn!(target:"sync", "Asking {:?} while requesting {:?}", asking, peer.asking); } } - match sync.network.send(*peer_id, packet_id, packet) { + match sync.send(*peer_id, packet_id, packet) { Err(e) => { warn!(target:"sync", "Error sending request: {:?}", e); sync.disable_peer(peer_id); @@ -694,13 +677,20 @@ impl ChainSync { fn send_status(&mut self, io: &mut SyncIo, peer_id: &PeerId) { let mut packet = RlpStream::new_list(5); - let chain = io.chain.info(); + let chain = io.chain().info(); packet.append(&(PROTOCOL_VERSION as u32)); packet.append(&0u32); //TODO: network id packet.append(&chain.total_difficulty); packet.append(&chain.last_block_hash); packet.append(&chain.genesis_hash); - self.send_request(io, peer_id, PeerAsking::State, STATUS_PACKET, packet.out()); + //TODO: handle timeout for status request + match io.send(*peer_id, STATUS_PACKET, packet.out()) { + Err(e) => { + warn!(target:"sync", "Error sending status request: {:?}", e); + io.disable_peer(peer_id); + } + Ok(_) => { } + } } fn return_block_headers(&self, io: &mut SyncIo, r: &Rlp) { @@ -709,12 +699,12 @@ impl ChainSync { let max_headers: usize = r.val_at(1); let skip: usize = r.val_at(2); let reverse: bool = r.val_at(3); - let last = io.chain.info().last_block_number; + let last = io.chain().info().last_block_number; let mut number = if r.at(0).size() == 32 { // id is a hash let hash: H256 = r.val_at(0); trace!(target: "sync", "GetBlockHeaders (hash: {}, max: {}, skip: {}, reverse:{})", hash, max_headers, skip, reverse); - match io.chain.block_header(&hash) { + match io.chain().block_header(&hash) { Some(hdr) => From::from(rlp::decode::(&hdr).number), None => last } @@ -729,7 +719,7 @@ impl ChainSync { let mut count = 0; let mut data = Bytes::new(); while number < last && number > 1 && count < max_count { - match io.chain.block_header_at(number) { + match io.chain().block_header_at(number) { Some(mut hdr) => { data.append(&mut hdr); count += 1; @@ -745,7 +735,7 @@ impl ChainSync { } let mut rlp = RlpStream::new_list(count as usize); rlp.append_raw(&data, count as usize); - io.network.respond(BLOCK_HEADERS_PACKET, rlp.out()).unwrap_or_else(|e| + io.respond(BLOCK_HEADERS_PACKET, rlp.out()).unwrap_or_else(|e| debug!(target: "sync", "Error sending headers: {:?}", e)); } @@ -759,7 +749,7 @@ impl ChainSync { let mut added = 0usize; let mut data = Bytes::new(); for i in 0..count { - match io.chain.block_body(&r.val_at::(i)) { + match io.chain().block_body(&r.val_at::(i)) { Some(mut hdr) => { data.append(&mut hdr); added += 1; @@ -769,7 +759,7 @@ impl ChainSync { } let mut rlp = RlpStream::new_list(added); rlp.append_raw(&data, added); - io.network.respond(BLOCK_BODIES_PACKET, rlp.out()).unwrap_or_else(|e| + io.respond(BLOCK_BODIES_PACKET, rlp.out()).unwrap_or_else(|e| debug!(target: "sync", "Error sending headers: {:?}", e)); } @@ -783,7 +773,7 @@ impl ChainSync { let mut added = 0usize; let mut data = Bytes::new(); for i in 0..count { - match io.chain.state_data(&r.val_at::(i)) { + match io.chain().state_data(&r.val_at::(i)) { Some(mut hdr) => { data.append(&mut hdr); added += 1; @@ -793,7 +783,7 @@ impl ChainSync { } let mut rlp = RlpStream::new_list(added); rlp.append_raw(&data, added); - io.network.respond(NODE_DATA_PACKET, rlp.out()).unwrap_or_else(|e| + io.respond(NODE_DATA_PACKET, rlp.out()).unwrap_or_else(|e| debug!(target: "sync", "Error sending headers: {:?}", e)); } @@ -807,7 +797,7 @@ impl ChainSync { let mut added = 0usize; let mut data = Bytes::new(); for i in 0..count { - match io.chain.block_receipts(&r.val_at::(i)) { + match io.chain().block_receipts(&r.val_at::(i)) { Some(mut hdr) => { data.append(&mut hdr); added += 1; @@ -817,7 +807,7 @@ impl ChainSync { } let mut rlp = RlpStream::new_list(added); rlp.append_raw(&data, added); - io.network.respond(RECEIPTS_PACKET, rlp.out()).unwrap_or_else(|e| + io.respond(RECEIPTS_PACKET, rlp.out()).unwrap_or_else(|e| debug!(target: "sync", "Error sending headers: {:?}", e)); } @@ -834,7 +824,7 @@ impl ChainSync { NEW_BLOCK_HASHES_PACKET => self.on_peer_new_hashes(io, peer, &rlp), GET_NODE_DATA_PACKET => self.return_node_data(io, &rlp), GET_RECEIPTS_PACKET => self.return_receipts(io, &rlp), - _ => debug!(target: "sync", "Unkown packet {}", packet_id) + _ => debug!(target: "sync", "Unknown packet {}", packet_id) } } diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 3ec9c26ce..8efb86a43 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -1,7 +1,7 @@ use std::sync::{Arc}; use eth::{BlockChainClient}; -use util::network::{ProtocolHandler, NetworkService, HandlerIo, TimerToken, PeerId}; -use sync::chain::{ChainSync, SyncIo}; +use util::network::{ProtocolHandler, NetworkService, HandlerIo, TimerToken, PeerId, PacketId, Error as NetworkError}; +use sync::chain::{ChainSync}; mod chain; mod range_collection; @@ -9,7 +9,6 @@ mod range_collection; #[cfg(test)] mod tests; - pub fn new(_service: &mut NetworkService, eth_client: Arc) -> EthSync { EthSync { chain: eth_client, @@ -17,6 +16,45 @@ pub fn new(_service: &mut NetworkService, eth_client: Arc) -> Result<(), NetworkError>; + fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), NetworkError>; + fn chain<'s>(&'s mut self) -> &'s mut BlockChainClient; +} + +pub struct NetSyncIo<'s, 'h> where 'h:'s { + network: &'s mut HandlerIo<'h>, + chain: &'s mut BlockChainClient +} + +impl<'s, 'h> NetSyncIo<'s, 'h> { + pub fn new(network: &'s mut HandlerIo<'h>, chain: &'s mut BlockChainClient) -> NetSyncIo<'s,'h> { + NetSyncIo { + network: network, + chain: chain, + } + } +} + +impl<'s, 'h> SyncIo for NetSyncIo<'s, 'h> { + fn disable_peer(&mut self, peer_id: &PeerId) { + self.network.disable_peer(*peer_id); + } + + fn respond(&mut self, packet_id: PacketId, data: Vec) -> Result<(), NetworkError>{ + self.network.respond(packet_id, data) + } + + fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), NetworkError>{ + self.network.send(peer_id, packet_id, data) + } + + fn chain<'a>(&'a mut self) -> &'a mut BlockChainClient { + self.chain + } +} + pub struct EthSync { chain: Arc, sync: ChainSync @@ -34,34 +72,34 @@ impl EthSync { } pub fn stop_network(&mut self, io: &mut HandlerIo) { - self.sync.abort(&mut SyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap())); + self.sync.abort(&mut NetSyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap())); } pub fn start_network(&mut self, io: &mut HandlerIo) { - self.sync.restart(&mut SyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap())); + self.sync.restart(&mut NetSyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap())); } } impl ProtocolHandler for EthSync { fn initialize(&mut self, io: &mut HandlerIo) { - self.sync.restart(&mut SyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap())); + self.sync.restart(&mut NetSyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap())); io.register_timer(1000).unwrap(); } fn read(&mut self, io: &mut HandlerIo, peer: &PeerId, packet_id: u8, data: &[u8]) { - self.sync.on_packet(&mut SyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap()), peer, packet_id, data); + self.sync.on_packet(&mut NetSyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap()), peer, packet_id, data); } fn connected(&mut self, io: &mut HandlerIo, peer: &PeerId) { - self.sync.on_peer_connected(&mut SyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap()), peer); + self.sync.on_peer_connected(&mut NetSyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap()), peer); } fn disconnected(&mut self, io: &mut HandlerIo, peer: &PeerId) { - self.sync.on_peer_aborting(&mut SyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap()), peer); + self.sync.on_peer_aborting(&mut NetSyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap()), peer); } fn timeout(&mut self, io: &mut HandlerIo, _timer: TimerToken) { - self.sync.maintain_sync(&mut SyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap())); + self.sync.maintain_sync(&mut NetSyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap())); } } diff --git a/src/sync/tests.rs b/src/sync/tests.rs index a51bf1543..7be66ff2f 100644 --- a/src/sync/tests.rs +++ b/src/sync/tests.rs @@ -1,13 +1,285 @@ -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use util::bytes::Bytes; -use util::hash::H256; +use util::hash::{H256, FixedHash}; +use util::uint::{U256}; +use util::sha3::Hashable; +use util::rlp::{self, Rlp, RlpStream, View, Stream}; +use util::network::{PeerId, PacketId, Error as NetworkError}; +use eth::{BlockChainClient, BlockStatus, BlockNumber, TreeRoute, BlockQueueStatus, BlockChainInfo, ImportResult, BlockHeader, QueueStatus}; +use sync::{SyncIo}; +use sync::chain::{ChainSync}; struct TestBlockChainClient { blocks: Vec, hashes: HashMap, + genesis_hash: H256, + last_hash: H256, + difficulty: U256 +} + +impl TestBlockChainClient { + fn new() -> TestBlockChainClient { + + let mut client = TestBlockChainClient { + blocks: Vec::new(), + hashes: HashMap::new(), + genesis_hash: H256::new(), + last_hash: H256::new(), + difficulty: From::from(0), + }; + client.add_blocks(1, true); // add genesis block + client.genesis_hash = client.last_hash; + client + } + + pub fn add_blocks(&mut self, count: usize, empty: bool) { + for n in self.blocks.len()..(self.blocks.len() + count) { + let mut header = BlockHeader::new(); + header.difficulty = From::from(n); + header.parent_hash = self.last_hash; + header.number = From::from(n); + let mut uncles = RlpStream::new_list(if empty {0} else {1}); + if !empty { + uncles.append(&H256::random()); + header.uncles_hash = uncles.raw().sha3(); + } + let mut rlp = RlpStream::new_list(3); + rlp.append(&header); + rlp.append_raw(uncles.raw(), 1); + rlp.append_raw(&rlp::NULL_RLP, 1); + self.import_block(rlp.raw()); + } + } +} + +impl BlockChainClient for TestBlockChainClient { + fn block_header(&self, h: &H256) -> Option { + self.hashes.get(h).and_then(|i| self.block_header_at(*i as BlockNumber)) + } + + fn block_body(&self, h: &H256) -> Option { + self.hashes.get(h).and_then(|i| self.block_body_at(*i as BlockNumber)) + } + + fn block(&self, h: &H256) -> Option { + self.hashes.get(h).map(|i| self.blocks[*i].clone()) + } + + fn block_status(&self, h: &H256) -> BlockStatus { + self.hashes.get(h).map(|i| self.block_status_at(*i as BlockNumber)).unwrap_or(BlockStatus::Unknown) + } + + fn block_header_at(&self, n: BlockNumber) -> Option { + self.blocks.get(n as usize).map(|r| Rlp::new(r).at(0).raw().to_vec()) + } + + fn block_body_at(&self, n: BlockNumber) -> Option { + self.blocks.get(n as usize).map(|r| { + let mut stream = RlpStream::new_list(2); + stream.append_raw(Rlp::new(&r).at(1).raw(), 1); + stream.append_raw(Rlp::new(&r).at(2).raw(), 1); + stream.out() + }) + } + + fn block_at(&self, n: BlockNumber) -> Option { + self.blocks.get(n as usize).map(|b| b.clone()) + } + + fn block_status_at(&self, n: BlockNumber) -> BlockStatus { + if (n as usize) < self.blocks.len() { + BlockStatus::InChain + } else { + BlockStatus::Unknown + } + } + + fn tree_route(&self, _from: &H256, _to: &H256) -> TreeRoute { + TreeRoute { + blocks: Vec::new(), + ancestor: H256::new(), + index: 0 + } + } + + fn state_data(&self, _h: &H256) -> Option { + None + } + + fn block_receipts(&self, _h: &H256) -> Option { + None + } + + fn import_block(&mut self, b: &[u8]) -> ImportResult { + let header = Rlp::new(&b).val_at::(0); + if header.number != From::from(self.blocks.len()) { + panic!("Unexpected block number"); + } + if !self.blocks.is_empty() { + let parent = Rlp::new(self.blocks.last().unwrap()).val_at::(0); + if header.parent_hash != parent.hash() { + panic!("Unexpected block header"); + } + } + self.difficulty = self.difficulty + header.difficulty; + self.last_hash = header.hash(); + self.hashes.insert(header.hash(), self.blocks.len()); + self.blocks.push(b.to_vec()); + ImportResult::Queued(QueueStatus::Known) + } + + fn queue_status(&self) -> BlockQueueStatus { + BlockQueueStatus { + full: false, + } + } + + fn clear_queue(&mut self) { + } + + fn info(&self) -> BlockChainInfo { + BlockChainInfo { + total_difficulty: self.difficulty, + pending_total_difficulty: self.difficulty, + genesis_hash: self.genesis_hash, + last_block_hash: self.last_hash, + last_block_number: self.blocks.len() as BlockNumber - 1, + } + } +} + +struct TestIo<'p> { + chain: &'p mut TestBlockChainClient, + queue: &'p mut VecDeque, + sender: Option, +} + +impl<'p> TestIo<'p> { + fn new(chain: &'p mut TestBlockChainClient, queue: &'p mut VecDeque, sender: Option) -> TestIo<'p> { + TestIo { + chain: chain, + queue: queue, + sender: sender + } + } +} + +impl<'p> SyncIo for TestIo<'p> { + fn disable_peer(&mut self, _peer_id: &PeerId) { + } + + fn respond(&mut self, packet_id: PacketId, data: Vec) -> Result<(), NetworkError> { + self.queue.push_back(TestPacket { + data: data, + packet_id: packet_id, + recipient: self.sender.unwrap() + }); + Ok(()) + } + + fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), NetworkError> { + self.queue.push_back(TestPacket { + data: data, + packet_id: packet_id, + recipient: peer_id, + }); + Ok(()) + } + + fn chain<'a>(&'a mut self) -> &'a mut BlockChainClient { + self.chain + } +} + +struct TestPacket { + data: Bytes, + packet_id: PacketId, + recipient: PeerId, +} + +struct TestPeer { + chain: TestBlockChainClient, + sync: ChainSync, + queue: VecDeque, +} + +struct TestNet { + peers: Vec +} + +impl TestNet { + pub fn new(n: usize) -> TestNet { + let mut net = TestNet { + peers: Vec::new(), + }; + for _ in 0..n { + net.peers.push(TestPeer { + chain: TestBlockChainClient::new(), + sync: ChainSync::new(), + queue: VecDeque::new(), + }); + } + net + } + + pub fn peer(&mut self, i: usize) -> &mut TestPeer { + self.peers.get_mut(i).unwrap() + } + + pub fn start(&mut self) { + for peer in 0..self.peers.len() { + for client in 0..self.peers.len() { + if peer != client { + let mut p = self.peers.get_mut(peer).unwrap(); + p.sync.on_peer_connected(&mut TestIo::new(&mut p.chain, &mut p.queue, Some(client as PeerId)), &(client as PeerId)); + } + } + } + } + + pub fn sync_step(&mut self) { + for peer in 0..self.peers.len() { + match self.peers[peer].queue.pop_front() { + Some(packet) => { + let mut p = self.peers.get_mut(packet.recipient).unwrap(); + p.sync.on_packet(&mut TestIo::new(&mut p.chain, &mut p.queue, Some(peer as PeerId)), &(peer as PeerId), packet.packet_id, &packet.data); + }, + None => {} + } + let mut p = self.peers.get_mut(peer).unwrap(); + p.sync.maintain_sync(&mut TestIo::new(&mut p.chain, &mut p.queue, None)); + } + } + + pub fn sync(&mut self) { + self.start(); + while !self.done() { + self.sync_step() + } + } + + pub fn done(&self) -> bool { + self.peers.iter().all(|p| p.queue.is_empty()) + } } #[test] -fn full_sync() { +fn full_sync_two_peers() { + let mut net = TestNet::new(3); + net.peer(1).chain.add_blocks(1000, false); + net.peer(2).chain.add_blocks(1000, false); + net.sync(); + assert_eq!(net.peer(0).chain.block_at(50000), net.peer(1).chain.block_at(50000)); +} + +#[test] +fn full_sync_empty_blocks() { + let mut net = TestNet::new(3); + for n in 0..200 { + net.peer(1).chain.add_blocks(5, n % 2 == 0); + net.peer(2).chain.add_blocks(5, n % 2 == 0); + } + net.sync(); + assert_eq!(net.peer(0).chain.block_at(50000), net.peer(1).chain.block_at(50000)); }