From ca6c91f591e62c5b5aaa1e105572370bae0a57d0 Mon Sep 17 00:00:00 2001 From: arkpar Date: Mon, 16 May 2016 14:41:41 +0200 Subject: [PATCH] New sync algorithm --- Cargo.lock | 6 +- ethcore/src/client/test_client.rs | 20 +- rpc/src/v1/impls/eth.rs | 4 +- rpc/src/v1/tests/helpers/sync_provider.rs | 2 +- sync/src/chain.rs | 921 ++++++++++------------ sync/src/lib.rs | 5 +- sync/src/range_collection.rs | 317 -------- sync/src/tests/chain.rs | 30 +- sync/src/tests/helpers.rs | 6 +- 9 files changed, 487 insertions(+), 824 deletions(-) delete mode 100644 sync/src/range_collection.rs diff --git a/Cargo.lock b/Cargo.lock index 7f336c2c3..45266cc5a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -332,7 +332,7 @@ dependencies = [ "mio 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "nix 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", - "rocksdb 0.4.3", + "rocksdb 0.4.3 (git+https://github.com/ethcore/rust-rocksdb)", "rust-crypto 0.2.35 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)", "rustc_version 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", @@ -628,6 +628,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "librocksdb-sys" version = "0.2.3" +source = "git+https://github.com/ethcore/rust-rocksdb#6b6ce93e2828182691e00da57fdfb2926226f1f1" dependencies = [ "libc 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -969,9 +970,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "rocksdb" version = "0.4.3" +source = "git+https://github.com/ethcore/rust-rocksdb#6b6ce93e2828182691e00da57fdfb2926226f1f1" dependencies = [ "libc 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", - "librocksdb-sys 0.2.3", + "librocksdb-sys 0.2.3 (git+https://github.com/ethcore/rust-rocksdb)", ] [[package]] diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index e05ed789f..014566b6f 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -191,11 +191,23 @@ impl TestBlockChainClient { } } - /// TODO: + /// Make a bad block by setting invalid extra data. pub fn corrupt_block(&mut self, n: BlockNumber) { let hash = self.block_hash(BlockID::Number(n)).unwrap(); let mut header: BlockHeader = decode(&self.block_header(BlockID::Number(n)).unwrap()); - header.parent_hash = H256::new(); + header.extra_data = b"This extra data is way too long to be considered valid".to_vec(); + let mut rlp = RlpStream::new_list(3); + rlp.append(&header); + rlp.append_raw(&rlp::NULL_RLP, 1); + rlp.append_raw(&rlp::NULL_RLP, 1); + self.blocks.write().unwrap().insert(hash, rlp.out()); + } + + /// Make a bad block by setting invalid parent hash. + pub fn corrupt_block_parent(&mut self, n: BlockNumber) { + let hash = self.block_hash(BlockID::Number(n)).unwrap(); + let mut header: BlockHeader = decode(&self.block_header(BlockID::Number(n)).unwrap()); + header.parent_hash = H256::from(42); let mut rlp = RlpStream::new_list(3); rlp.append(&header); rlp.append_raw(&rlp::NULL_RLP, 1); @@ -229,8 +241,8 @@ impl BlockChainClient for TestBlockChainClient { Some(U256::zero()) } - fn block_hash(&self, _id: BlockID) -> Option { - unimplemented!(); + fn block_hash(&self, id: BlockID) -> Option { + Self::block_hash(self, id) } fn nonce(&self, address: &Address) -> U256 { diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index 4c2eee3a9..636b1a8da 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -233,8 +233,8 @@ impl Eth for EthClient Params::None => { let status = take_weak!(self.sync).status(); let res = match status.state { - SyncState::NotSynced | SyncState::Idle => SyncStatus::None, - SyncState::Waiting | SyncState::Blocks | SyncState::NewBlocks => SyncStatus::Info(SyncInfo { + SyncState::Idle => SyncStatus::None, + SyncState::Waiting | SyncState::Blocks | SyncState::NewBlocks | SyncState::ChainHead => SyncStatus::Info(SyncInfo { starting_block: U256::from(status.start_block_number), current_block: U256::from(take_weak!(self.client).chain_info().best_block_number), highest_block: U256::from(status.highest_block_number.unwrap_or(status.start_block_number)) diff --git a/rpc/src/v1/tests/helpers/sync_provider.rs b/rpc/src/v1/tests/helpers/sync_provider.rs index 633e0d45b..fc81586dd 100644 --- a/rpc/src/v1/tests/helpers/sync_provider.rs +++ b/rpc/src/v1/tests/helpers/sync_provider.rs @@ -39,7 +39,7 @@ impl TestSyncProvider { pub fn new(config: Config) -> Self { TestSyncProvider { status: RwLock::new(SyncStatus { - state: SyncState::NotSynced, + state: SyncState::Idle, network_id: config.network_id, protocol_version: 63, start_block_number: 0, diff --git a/sync/src/chain.rs b/sync/src/chain.rs index b733aa3ed..01f6749f9 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -14,27 +14,86 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . + + /// /// `BlockChain` synchronization strategy. /// Syncs to peers and keeps up to date. /// This implementation uses ethereum protocol v63 /// -/// Syncing strategy. +/// Syncing strategy summary. +/// Split the chain into ranges of N blocks each. Download ranges sequentially. Split each range into subchains of M blocks. Download subchains in parallel. +/// State. +/// Sync state consists of the following data: +/// - s: State enum which can be one of the following values: ChainHead, Blocks, Idle +/// - H: A set of downloaded block headers +/// - B: A set of downloaded block bodies +/// - S: Set of block subchain start block hashes to download. +/// - l: Last imported / common block hash +/// - P: A set of connected peers. For each peer we maintain its last known total difficulty and starting block hash being requested if any. +/// General behaviour. +/// We start with all sets empty, l is set to the best block in the block chain, s is set to ChainHead. +/// If at any moment a bad block is reported by the block queue, we set s to ChainHead, reset l to the best block in the block chain and clear H, B and S. +/// If at any moment P becomes empty, we set s to ChainHead, and clear H, B and S. /// -/// 1. A peer arrives with a total difficulty better than ours -/// 2. Find a common best block between our an peer chain. -/// Start with out best block and request headers from peer backwards until a common block is found -/// 3. Download headers and block bodies from peers in parallel. -/// As soon as a set of the blocks is fully downloaded at the head of the queue it is fed to the blockchain -/// 4. Maintain sync by handling `NewBlocks/NewHashes` messages +/// Workflow for ChainHead state. +/// In this state we try to get subchain headers with a single GetBlockHeaders request. +/// On NewPeer / On Restart: +/// If peer's total difficulty is higher, request N/M headers with interval M+1 starting from l +/// On BlockHeaders(R): +/// If R is empty: +/// If l is equal to genesis block hash or l is more than 1000 blocks behind our best hash: +/// Remove current peer from P. set l to the best block in the block chain. Select peer with maximum total difficulty from P and restart. +/// Else +/// Set l to l’s parent and restart. +/// Else if we already have all the headers in the block chain or the block queue: +/// Set s to Idle, +/// Else +/// Set S to R, set s to Blocks. +/// +/// +/// All other messages are ignored. +/// Workflow for Blocks state. +/// In this state we download block headers and bodies from multiple peers. +/// On NewPeer / On Restart: +/// For all idle peers: +/// Find a set of 256 or less block hashes in H which are not in B and not being downloaded by other peers. If the set is not empty: +/// Request block bodies for the hashes in the set. +/// Else +/// Find an element in S which is not being downloaded by other peers. If found: Request M headers starting from the element. +/// +/// On BlockHeaders(R): +/// If R is empty remove current peer from P and restart. +/// Validate received headers. For each header find a parent in H or R or the blockchain. Restart if there is a block with unknown parent. +/// Go to CollectBlocks. +/// +/// On BlockBodies(R): +/// If R is empty remove current peer from P and restart. +/// Add bodies with a matching header in H to B. +/// Go to CollectBlocks. +/// +/// CollectBlocks: +/// Find a chain of blocks C in H starting from h where h’s parent equals to l. The chain ends with the first block which does not have a body in B. +/// Add all blocks from the chain to the block queue. Remove them from H and B. Set l to the hash of the last block from C. +/// Update and merge subchain heads in S. For each h in S find a chain of blocks in B starting from h. Remove h from S. if the chain does not include an element from S add the end of the chain to S. +/// If H is empty and S contains a single element set s to ChainHead. +/// Restart. +/// +/// All other messages are ignored. +/// Workflow for Idle state. +/// On NewBlock: +/// Import the block. If the block is unknown set s to ChainHead and restart. +/// On NewHashes: +/// Set s to ChainHead and restart. +/// +/// All other messages are ignored. /// use util::*; use std::mem::{replace}; -use ethcore::views::{HeaderView}; +use ethcore::views::{HeaderView, BlockView}; use ethcore::header::{BlockNumber, Header as BlockHeader}; use ethcore::client::{BlockChainClient, BlockStatus, BlockID, BlockChainInfo}; -use range_collection::{RangeCollection, ToUsize, FromUsize}; use ethcore::error::*; use ethcore::transaction::SignedTransaction; use ethcore::block::Block; @@ -42,20 +101,9 @@ use ethminer::{Miner, MinerService, AccountDetails}; use io::SyncIo; use time; use super::SyncConfig; +use blocks::BlockCollection; -known_heap_size!(0, PeerInfo, Header, HeaderId); - -impl ToUsize for BlockNumber { - fn to_usize(&self) -> usize { - *self as usize - } -} - -impl FromUsize for BlockNumber { - fn from_usize(s: usize) -> BlockNumber { - s as BlockNumber - } -} +known_heap_size!(0, PeerInfo); type PacketDecodeError = DecoderError; @@ -65,7 +113,7 @@ const MAX_HEADERS_TO_SEND: usize = 512; const MAX_NODE_DATA_TO_SEND: usize = 1024; const MAX_RECEIPTS_TO_SEND: usize = 1024; const MAX_RECEIPTS_HEADERS_TO_SEND: usize = 256; -const MAX_HEADERS_TO_REQUEST: usize = 512; +const MAX_HEADERS_TO_REQUEST: usize = 256; const MAX_BODIES_TO_REQUEST: usize = 64; const MIN_PEERS_PROPAGATION: usize = 4; const MAX_PEERS_PROPAGATION: usize = 128; @@ -87,27 +135,11 @@ const RECEIPTS_PACKET: u8 = 0x10; const CONNECTION_TIMEOUT_SEC: f64 = 10f64; -struct Header { - /// Header data - data: Bytes, - /// Block hash - hash: H256, - /// Parent hash - parent: H256, -} - -/// Used to identify header by transactions and uncles hashes -#[derive(Eq, PartialEq, Hash)] -struct HeaderId { - transactions_root: H256, - uncles: H256 -} - #[derive(Copy, Clone, Eq, PartialEq, Debug)] /// Sync state pub enum SyncState { - /// Initial chain sync has not started yet - NotSynced, + /// Downloading subchain heads + ChainHead, /// Initial chain sync complete. Waiting for new packets Idle, /// Block downloading paused. Waiting for block queue to process blocks and free some space @@ -151,6 +183,7 @@ enum PeerAsking { Nothing, BlockHeaders, BlockBodies, + Heads, } #[derive(Clone)] @@ -166,12 +199,12 @@ struct PeerInfo { latest_hash: H256, /// Peer best block number if known latest_number: Option, - /// Peer total difficulty - difficulty: U256, + /// Peer total difficulty if known + difficulty: Option, /// Type of data currenty being requested from peer. asking: PeerAsking, /// A set of block numbers being requested - asking_blocks: Vec, + asking_blocks: Vec, /// Holds requested header hash if currently requesting block header by hash asking_hash: Option, /// Request timestamp @@ -187,32 +220,24 @@ pub struct ChainSync { starting_block: BlockNumber, /// Highest block number seen highest_block: Option, - /// Set of block header numbers being downloaded - downloading_headers: HashSet, - /// Set of block body numbers being downloaded - downloading_bodies: HashSet, - /// Set of block headers being downloaded by hash - downloading_hashes: HashSet, - /// Downloaded headers. - headers: Vec<(BlockNumber, Vec
)>, //TODO: use BTreeMap once range API is sable. For now it is a vector sorted in descending order - /// Downloaded bodies - bodies: Vec<(BlockNumber, Vec)>, //TODO: use BTreeMap once range API is sable. For now it is a vector sorted in descending order - /// Peer info + /// All connected peers peers: HashMap, - /// Used to map body to header - header_ids: HashMap, + /// Peers active for current sync round + active_peers: HashSet, + /// Downloaded blocks, holds `H`, `B` and `S` + blocks: BlockCollection, /// Last impoted block number - last_imported_block: Option, + last_imported_block: BlockNumber, /// Last impoted block hash - last_imported_hash: Option, + last_imported_hash: H256, /// Syncing total difficulty syncing_difficulty: U256, - /// True if common block for our and remote chain has been found - have_common_block: bool, /// Last propagated block number last_sent_block_number: BlockNumber, /// Max blocks to download ahead - max_download_ahead_blocks: usize, + _max_download_ahead_blocks: usize, + /// Number of blocks imported this round + imported_this_round: Option, /// Network ID network_id: U256, /// Miner @@ -223,27 +248,26 @@ type RlpResponseResult = Result, PacketDecodeError impl ChainSync { /// Create a new instance of syncing strategy. - pub fn new(config: SyncConfig, miner: Arc) -> ChainSync { - ChainSync { - state: SyncState::NotSynced, + pub fn new(config: SyncConfig, miner: Arc, chain: &BlockChainClient) -> ChainSync { + let chain = chain.chain_info(); + let mut sync = ChainSync { + state: SyncState::ChainHead, starting_block: 0, highest_block: None, - downloading_headers: HashSet::new(), - downloading_bodies: HashSet::new(), - downloading_hashes: HashSet::new(), - headers: Vec::new(), - bodies: Vec::new(), + last_imported_block: chain.best_block_number, + last_imported_hash: chain.best_block_hash, peers: HashMap::new(), - header_ids: HashMap::new(), - last_imported_block: None, - last_imported_hash: None, + active_peers: HashSet::new(), + blocks: BlockCollection::new(), syncing_difficulty: U256::from(0u64), - have_common_block: false, last_sent_block_number: 0, - max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks), + imported_this_round: None, + _max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks), network_id: config.network_id, miner: miner, - } + }; + sync.reset(); + sync } /// @returns Synchonization status @@ -253,21 +277,18 @@ impl ChainSync { protocol_version: 63, network_id: self.network_id, start_block_number: self.starting_block, - last_imported_block_number: self.last_imported_block, + last_imported_block_number: Some(self.last_imported_block), highest_block_number: self.highest_block, - blocks_received: match self.last_imported_block { Some(x) if x > self.starting_block => x - self.starting_block, _ => 0 }, + blocks_received: if self.last_imported_block > self.starting_block { self.last_imported_block - self.starting_block } else { 0 }, blocks_total: match self.highest_block { Some(x) if x > self.starting_block => x - self.starting_block, _ => 0 }, num_peers: self.peers.len(), num_active_peers: self.peers.values().filter(|p| p.asking != PeerAsking::Nothing).count(), mem_used: // TODO: https://github.com/servo/heapsize/pull/50 - // self.downloading_hashes.heap_size_of_children() //+ self.downloading_bodies.heap_size_of_children() - //+ self.downloading_hashes.heap_size_of_children() - self.headers.heap_size_of_children() - + self.bodies.heap_size_of_children() - + self.peers.heap_size_of_children() - + self.header_ids.heap_size_of_children(), + //+ self.downloading_headers.heap_size_of_children() + self.blocks.heap_size() + + self.peers.heap_size_of_children(), } } @@ -278,44 +299,55 @@ impl ChainSync { } #[cfg_attr(feature="dev", allow(for_kv_map))] // Because it's not possible to get `values_mut()` - /// Rest sync. Clear all downloaded data but keep the queue + /// Reset sync. Clear all downloaded data but keep the queue fn reset(&mut self) { - self.downloading_headers.clear(); - self.downloading_bodies.clear(); - self.headers.clear(); - self.bodies.clear(); + self.blocks.clear(); for (_, ref mut p) in &mut self.peers { p.asking_blocks.clear(); p.asking_hash = None; } - self.header_ids.clear(); self.syncing_difficulty = From::from(0u64); self.state = SyncState::Idle; + self.blocks.clear(); + self.active_peers = self.peers.keys().cloned().collect(); } /// Restart sync pub fn restart(&mut self, io: &mut SyncIo) { + trace!(target: "sync", "Restarting"); self.reset(); - self.starting_block = 0; - self.highest_block = None; - self.have_common_block = false; - self.starting_block = io.chain().chain_info().best_block_number; - self.state = SyncState::NotSynced; + self.start_sync_round(io); + self.continue_sync(io); + } + + /// Remove peer from active peer set + fn deactivate_peer(&mut self, io: &mut SyncIo, peer_id: PeerId) { + self.active_peers.remove(&peer_id); + if self.active_peers.is_empty() { + trace!(target: "sync", "No more active peers"); + if self.state == SyncState::ChainHead { + self.complete_sync(); + } else { + self.restart(io); + } + } } /// Restart sync after bad block has been detected. May end up re-downloading up to QUEUE_SIZE blocks - pub fn restart_on_bad_block(&mut self, io: &mut SyncIo) { - self.restart(io); + fn restart_on_bad_block(&mut self, io: &mut SyncIo) { // Do not assume that the block queue/chain still has our last_imported_block - self.last_imported_block = None; - self.last_imported_hash = None; + let chain = io.chain().chain_info(); + self.last_imported_block = chain.best_block_number; + self.last_imported_hash = chain.best_block_hash; + self.restart(io); } + /// Called by peer to report status fn on_peer_status(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { let peer = PeerInfo { protocol_version: try!(r.val_at(0)), network_id: try!(r.val_at(1)), - difficulty: try!(r.val_at(2)), + difficulty: Some(try!(r.val_at(2))), latest_hash: try!(r.val_at(3)), latest_number: None, genesis: try!(r.val_at(4)), @@ -334,16 +366,17 @@ impl ChainSync { let chain_info = io.chain().chain_info(); if peer.genesis != chain_info.genesis_hash { io.disable_peer(peer_id); - trace!(target: "sync", "Peer {} genesis hash not matched", peer_id); + trace!(target: "sync", "Peer {} genesis hash mismatch (ours: {}, theirs: {})", peer_id, chain_info.genesis_hash, peer.genesis); return Ok(()); } if peer.network_id != self.network_id { io.disable_peer(peer_id); - trace!(target: "sync", "Peer {} network id not matched", peer_id); + trace!(target: "sync", "Peer {} network id mismatch (ours: {}, theirs: {})", peer_id, self.network_id, peer.network_id); return Ok(()); } self.peers.insert(peer_id.clone(), peer); + self.active_peers.insert(peer_id.clone()); debug!(target: "sync", "Connected {}:{}", peer_id, io.peer_info(peer_id)); self.sync_peer(io, peer_id, false); Ok(()) @@ -352,24 +385,37 @@ impl ChainSync { #[cfg_attr(feature="dev", allow(cyclomatic_complexity))] /// Called by peer once it has new block headers during sync fn on_peer_block_headers(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { - self.reset_peer_asking(peer_id, PeerAsking::BlockHeaders); + self.clear_peer_download(peer_id); + let expected_asking = if self.state == SyncState::ChainHead { PeerAsking::Heads } else { PeerAsking::BlockHeaders }; + if !self.reset_peer_asking(peer_id, expected_asking) { + trace!(target: "sync", "Ignored unexpected headers"); + self.continue_sync(io); + return Ok(()); + } let item_count = r.item_count(); trace!(target: "sync", "{} -> BlockHeaders ({} entries)", peer_id, item_count); - self.clear_peer_download(peer_id); - if self.state != SyncState::Blocks && self.state != SyncState::NewBlocks && self.state != SyncState::Waiting { + if self.state == SyncState::Idle { trace!(target: "sync", "Ignored unexpected block headers"); + self.continue_sync(io); return Ok(()); } if self.state == SyncState::Waiting { trace!(target: "sync", "Ignored block headers while waiting"); + self.continue_sync(io); + return Ok(()); + } + if item_count == 0 && (self.state == SyncState::Blocks || self.state == SyncState::NewBlocks) { + self.deactivate_peer(io, peer_id); //TODO: is this too harsh? return Ok(()); } + let mut headers = Vec::new(); + let mut hashes = Vec::new(); for i in 0..item_count { let info: BlockHeader = try!(r.val_at(i)); let number = BlockNumber::from(info.number); - if (number <= self.current_base_block() && self.have_common_block) || self.headers.have_item(&number) { - trace!(target: "sync", "Skipping existing block header"); + if self.blocks.contains(&info.hash()) { + trace!(target: "sync", "Skipping existing block header {} ({:?})", number, info.hash()); continue; } @@ -379,72 +425,46 @@ impl ChainSync { let hash = info.hash(); match io.chain().block_status(BlockID::Hash(hash.clone())) { BlockStatus::InChain | BlockStatus::Queued => { - if !self.have_common_block || self.current_base_block() < number { - self.last_imported_block = Some(number); - self.last_imported_hash = Some(hash.clone()); - } - if !self.have_common_block { - self.have_common_block = true; - trace!(target: "sync", "Found common header {} ({})", number, hash); - } else { - trace!(target: "sync", "Header already in chain {} ({}), restarting", number, hash); - self.restart(io); - self.continue_sync(io); - return Ok(()); + match self.state { + SyncState::Blocks | SyncState::NewBlocks => trace!(target: "sync", "Header already in chain {} ({})", number, hash), + _ => trace!(target: "sync", "Unexpected header already in chain {} ({}), state = {:?}", number, hash, self.state), } + headers.push(try!(r.at(i)).as_raw().to_vec()); + hashes.push(hash); }, - _ => { - if self.have_common_block { - //validate chain - let base_hash = self.last_imported_hash.clone().unwrap(); - if self.have_common_block && number == self.current_base_block() + 1 && info.parent_hash != base_hash { - // Part of the forked chain. Restart to find common block again - debug!(target: "sync", "Mismatched block header {} {}, restarting sync", number, hash); - self.restart(io); - return Ok(()); - } - if self.headers.find_item(&(number - 1)).map_or(false, |p| p.hash != info.parent_hash) { - // mismatching parent id, delete the previous block and don't add this one - debug!(target: "sync", "Mismatched block header {} {}", number, hash); - self.remove_downloaded_blocks(number - 1); - continue; - } - if self.headers.find_item(&(number + 1)).map_or(false, |p| p.parent != hash) { - // mismatching parent id for the next block, clear following headers - debug!(target: "sync", "Mismatched block header {}", number + 1); - self.remove_downloaded_blocks(number + 1); - } - if self.have_common_block && number < self.current_base_block() + 1 { - // unkown header - debug!(target: "sync", "Old block header {:?} ({}) is unknown, restarting sync", hash, number); - self.restart(io); - return Ok(()); - } - } - let hdr = Header { - data: try!(r.at(i)).as_raw().to_vec(), - hash: hash.clone(), - parent: info.parent_hash, - }; - self.headers.insert_item(number, hdr); - let header_id = HeaderId { - transactions_root: info.transactions_root, - uncles: info.uncles_hash - }; - trace!(target: "sync", "Got header {} ({})", number, hash); - if header_id.transactions_root == rlp::SHA3_NULL_RLP && header_id.uncles == rlp::SHA3_EMPTY_LIST_RLP { - //empty body, just mark as downloaded - let mut body_stream = RlpStream::new_list(2); - body_stream.append_raw(&rlp::NULL_RLP, 1); - body_stream.append_raw(&rlp::EMPTY_LIST_RLP, 1); - self.bodies.insert_item(number, body_stream.out()); - } - else { - self.header_ids.insert(header_id, number); - } + BlockStatus::Bad => { + warn!(target: "sync", "Bad header {} ({}) from {}: {}, state = {:?}", number, hash, peer_id, io.peer_info(peer_id), self.state); + io.disable_peer(peer_id); + return Ok(()); + }, + BlockStatus::Unknown => { + headers.push(try!(r.at(i)).as_raw().to_vec()); + hashes.push(hash); } } } + + match self.state { + SyncState::ChainHead => { + if headers.is_empty() { + // peer is not on our chain + // track back and try again + self.imported_this_round = Some(0); + self.start_sync_round(io); + } else { + // TODO: validate heads better. E.g. check that there is enough distance between blocks. + trace!(target: "sync", "Received {} subchain heads, proceeding to download", headers.len()); + self.blocks.reset_to(hashes); + self.state = SyncState::Blocks; + } + }, + SyncState::Blocks | SyncState::NewBlocks | SyncState::Waiting => { + trace!(target: "sync", "Inserted {} headers", headers.len()); + self.blocks.insert_headers(headers); + }, + _ => trace!(target: "sync", "Unexpected headers({}) from {} ({}), state = {:?}", headers.len(), peer_id, io.peer_info(peer_id), self.state) + } + self.collect_blocks(io); self.continue_sync(io); Ok(()) @@ -452,46 +472,28 @@ impl ChainSync { /// Called by peer once it has new block bodies fn on_peer_block_bodies(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { - use util::triehash::ordered_trie_root; + self.clear_peer_download(peer_id); self.reset_peer_asking(peer_id, PeerAsking::BlockBodies); let item_count = r.item_count(); trace!(target: "sync", "{} -> BlockBodies ({} entries)", peer_id, item_count); - self.clear_peer_download(peer_id); - if self.state != SyncState::Blocks && self.state != SyncState::NewBlocks && self.state != SyncState::Waiting { - trace!(target: "sync", "Ignored unexpected block bodies"); - return Ok(()); - } - if self.state == SyncState::Waiting { - trace!(target: "sync", "Ignored block bodies while waiting"); - return Ok(()); - } if item_count == 0 { - trace!(target: "sync", "No bodies returned, restarting"); - self.restart(io); - self.continue_sync(io); - return Ok(()); + self.deactivate_peer(io, peer_id); } - for i in 0..item_count { - let body = try!(r.at(i)); - let tx = try!(body.at(0)); - let tx_root = ordered_trie_root(tx.iter().map(|r| r.as_raw().to_vec()).collect()); //TODO: get rid of vectors here - let uncles = try!(body.at(1)).as_raw().sha3(); - let header_id = HeaderId { - transactions_root: tx_root, - uncles: uncles - }; - match self.header_ids.get(&header_id).cloned() { - Some(n) => { - self.header_ids.remove(&header_id); - self.bodies.insert_item(n, body.as_raw().to_vec()); - trace!(target: "sync", "Got body {}", n); - } - None => { - trace!(target: "sync", "Ignored unknown/stale block body"); - } + else if self.state != SyncState::Blocks && self.state != SyncState::NewBlocks && self.state != SyncState::Waiting { + trace!(target: "sync", "Ignored unexpected block bodies"); + } + else if self.state == SyncState::Waiting { + trace!(target: "sync", "Ignored block bodies while waiting"); + } + else + { + let mut bodies = Vec::with_capacity(item_count); + for i in 0..item_count { + bodies.push(try!(r.at(i)).as_raw().to_vec()); } + self.blocks.insert_bodies(bodies); + self.collect_blocks(io); } - self.collect_blocks(io); self.continue_sync(io); Ok(()) } @@ -503,19 +505,18 @@ impl ChainSync { let header_rlp = try!(block_rlp.at(0)); let h = header_rlp.as_raw().sha3(); trace!(target: "sync", "{} -> NewBlock ({})", peer_id, h); - if !self.have_common_block { + if self.state != SyncState::Idle { trace!(target: "sync", "NewBlock ignored while seeking"); return Ok(()); } let header: BlockHeader = try!(header_rlp.as_val()); - let mut unknown = false; + let mut unknown = false; { let peer = self.peers.get_mut(&peer_id).unwrap(); peer.latest_hash = header.hash(); peer.latest_number = Some(header.number()); } - // TODO: Decompose block and add to self.headers and self.bodies instead - if header.number <= From::from(self.current_base_block() + 1) { + if header.number <= self.last_imported_block + 1 { match io.chain().import_block(block_rlp.as_raw().to_vec()) { Err(Error::Import(ImportError::AlreadyInChain)) => { trace!(target: "sync", "New block already in chain {:?}", h); @@ -524,10 +525,9 @@ impl ChainSync { trace!(target: "sync", "New block already queued {:?}", h); }, Ok(_) => { - if self.current_base_block() < header.number { - self.last_imported_block = Some(header.number); - self.last_imported_hash = Some(header.hash()); - self.remove_downloaded_blocks(header.number); + if header.number == self.last_imported_block + 1 { + self.last_imported_block = header.number; + self.last_imported_hash = header.hash(); } trace!(target: "sync", "New block queued {:?} ({})", h, header.number); }, @@ -541,35 +541,39 @@ impl ChainSync { } }; } - else { + else { unknown = true; } if unknown { - trace!(target: "sync", "New block unknown {:?}", h); + trace!(target: "sync", "New unknown block {:?}", h); //TODO: handle too many unknown blocks let difficulty: U256 = try!(r.val_at(1)); - let peer_difficulty = self.peers.get_mut(&peer_id).unwrap().difficulty; - if difficulty > peer_difficulty { - trace!(target: "sync", "Received block {:?} with no known parent. Peer needs syncing...", h); - self.sync_peer(io, peer_id, true); + if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { + if peer.difficulty.map_or(true, |pd| difficulty > pd) { + //self.state = SyncState::ChainHead; + peer.difficulty = Some(difficulty); + trace!(target: "sync", "Received block {:?} with no known parent. Peer needs syncing...", h); + } } + self.sync_peer(io, peer_id, true); } Ok(()) } /// Handles `NewHashes` packet. Initiates headers download for any unknown hashes. fn on_peer_new_hashes(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { - if self.peers.get_mut(&peer_id).unwrap().asking != PeerAsking::Nothing { + if self.state != SyncState::Idle { trace!(target: "sync", "Ignoring new hashes since we're already downloading."); return Ok(()); } trace!(target: "sync", "{} -> NewHashes ({} entries)", peer_id, r.item_count()); let hashes = r.iter().map(|item| (item.val_at::(0), item.val_at::(1))); let mut max_height: BlockNumber = 0; + let mut new_hashes = Vec::new(); for (rh, rd) in hashes { let h = try!(rh); let d = try!(rd); - if self.downloading_hashes.contains(&h) { + if self.blocks.is_downloading(&h) { continue; } match io.chain().block_status(BlockID::Hash(h.clone())) { @@ -580,6 +584,7 @@ impl ChainSync { trace!(target: "sync", "New hash block already queued {:?}", h); }, BlockStatus::Unknown => { + new_hashes.push(h.clone()); if d > max_height { trace!(target: "sync", "New unknown block hash {:?}", h); let peer = self.peers.get_mut(&peer_id).unwrap(); @@ -588,7 +593,7 @@ impl ChainSync { max_height = d; } }, - BlockStatus::Bad =>{ + BlockStatus::Bad => { debug!(target: "sync", "Bad new block hash {:?}", h); io.disable_peer(peer_id); return Ok(()); @@ -596,6 +601,9 @@ impl ChainSync { } }; if max_height != 0 { + trace!(target: "sync", "Downloading blocks for new hashes"); + self.blocks.reset_to(new_hashes); + self.state = SyncState::NewBlocks; self.sync_peer(io, peer_id, true); } Ok(()) @@ -603,18 +611,19 @@ impl ChainSync { /// Called by peer when it is disconnecting pub fn on_peer_aborting(&mut self, io: &mut SyncIo, peer: PeerId) { - trace!(target: "sync", "== Disconnecting {}", peer); + trace!(target: "sync", "== Disconnecting {}: {}", peer, io.peer_info(peer)); if self.peers.contains_key(&peer) { debug!(target: "sync", "Disconnected {}", peer); self.clear_peer_download(peer); self.peers.remove(&peer); + self.active_peers.remove(&peer); self.continue_sync(io); } } /// Called when a new peer is connected pub fn on_peer_connected(&mut self, io: &mut SyncIo, peer: PeerId) { - trace!(target: "sync", "== Connected {}", peer); + trace!(target: "sync", "== Connected {}: {}", peer, io.peer_info(peer)); if let Err(e) = self.send_status(io) { debug!(target:"sync", "Error sending status request: {:?}", e); io.disable_peer(peer); @@ -623,14 +632,19 @@ impl ChainSync { /// Resume downloading fn continue_sync(&mut self, io: &mut SyncIo) { - let mut peers: Vec<(PeerId, U256)> = self.peers.iter().map(|(k, p)| (*k, p.difficulty)).collect(); + let mut peers: Vec<(PeerId, U256)> = self.peers.iter().map(|(k, p)| (*k, p.difficulty.unwrap_or_else(U256::zero))).collect(); peers.sort_by(|&(_, d1), &(_, d2)| d1.cmp(&d2).reverse()); //TODO: sort by rating for (p, _) in peers { - self.sync_peer(io, p, false); + if self.active_peers.contains(&p) { + self.sync_peer(io, p, false); + } + } + if !self.peers.values().any(|p| p.asking != PeerAsking::Nothing) { + self.complete_sync(); } } - /// Called after all blocks have been donloaded + /// Called after all blocks have been downloaded fn complete_sync(&mut self) { trace!(target: "sync", "Sync complete"); self.reset(); @@ -656,258 +670,169 @@ impl ChainSync { } (peer.latest_hash.clone(), peer.difficulty.clone()) }; - - let td = io.chain().chain_info().pending_total_difficulty; + let chain_info = io.chain().chain_info(); + let td = chain_info.pending_total_difficulty; let syncing_difficulty = max(self.syncing_difficulty, td); - if force || peer_difficulty > syncing_difficulty { - // start sync - self.syncing_difficulty = peer_difficulty; - if self.state == SyncState::Idle || self.state == SyncState::NotSynced { - self.state = SyncState::Blocks; - } - trace!(target: "sync", "Starting sync with better chain"); - self.peers.get_mut(&peer_id).unwrap().asking_hash = Some(peer_latest.clone()); - self.downloading_hashes.insert(peer_latest.clone()); - self.request_headers_by_hash(io, peer_id, &peer_latest, 1, 0, false); - } - else if self.state == SyncState::Blocks && io.chain().block_status(BlockID::Hash(peer_latest)) == BlockStatus::Unknown { - self.request_blocks(io, peer_id, false); - } - } - fn current_base_block(&self) -> BlockNumber { - match self.last_imported_block { None => 0, Some(x) => x } - } - - fn find_block_bodies_hashes_to_request(&self, ignore_others: bool) -> (Vec, Vec) { - let mut needed_bodies: Vec = Vec::new(); - let mut needed_numbers: Vec = Vec::new(); - - if self.have_common_block && !self.headers.is_empty() && self.headers.range_iter().next().unwrap().0 == self.current_base_block() + 1 { - if let Some((start, ref items)) = self.headers.range_iter().next() { - let mut index: BlockNumber = 0; - while index != items.len() as BlockNumber && needed_bodies.len() < MAX_BODIES_TO_REQUEST { - let block = start + index; - if ignore_others || (!self.downloading_bodies.contains(&block) && !self.bodies.have_item(&block)) { - needed_bodies.push(items[index as usize].hash.clone()); - needed_numbers.push(block); + if force || self.state == SyncState::NewBlocks || peer_difficulty.map_or(true, |pd| pd > syncing_difficulty) { + match self.state { + SyncState::Idle => { + if self.last_imported_block < chain_info.best_block_number { + self.last_imported_block = chain_info.best_block_number; + self.last_imported_hash = chain_info.best_block_hash; } - index += 1; + trace!(target: "sync", "Starting sync with {}", peer_id); + self.start_sync_round(io); + self.sync_peer(io, peer_id, force); + }, + SyncState::ChainHead => { + // Request subchain headers + trace!(target: "sync", "Starting sync with better chain"); + let last = self.last_imported_hash.clone(); + self.request_headers_by_hash(io, peer_id, &last, 128, 255, false, PeerAsking::Heads); + }, + SyncState::Blocks | SyncState::NewBlocks => { + if io.chain().block_status(BlockID::Hash(peer_latest)) == BlockStatus::Unknown { + self.request_blocks(io, peer_id, false); + } + } + SyncState::Waiting => () + } + } + } + + fn start_sync_round(&mut self, io: &mut SyncIo) { + self.state = SyncState::ChainHead; + trace!(target: "sync", "Starting round (last imported count = {:?}, block = {:?}", self.imported_this_round, self.last_imported_block); + if self.imported_this_round.is_some() && self.imported_this_round.unwrap() == 0 && self.last_imported_block > 0 { + match io.chain().block_hash(BlockID::Number(self.last_imported_block - 1)) { + Some(h) => { + self.last_imported_block -= 1; + self.last_imported_hash = h; + trace!(target: "sync", "Searching common header {} ({})", self.last_imported_block, self.last_imported_hash); + } + None => { + // TODO: get hash by number from the block queue + trace!(target: "sync", "Could not revert to previous block, last: {} ({})", self.last_imported_block, self.last_imported_hash); } } } - (needed_bodies, needed_numbers) + self.imported_this_round = None; } /// Find some headers or blocks to download for a peer. fn request_blocks(&mut self, io: &mut SyncIo, peer_id: PeerId, ignore_others: bool) { self.clear_peer_download(peer_id); - if io.chain().queue_info().is_full() { self.pause_sync(); return; } // check to see if we need to download any block bodies first - let (needed_bodies, needed_numbers) = self.find_block_bodies_hashes_to_request(ignore_others); + let needed_bodies = self.blocks.needed_bodies(MAX_BODIES_TO_REQUEST, ignore_others); if !needed_bodies.is_empty() { - let (head, _) = self.headers.range_iter().next().unwrap(); - if needed_numbers.first().unwrap() - head > self.max_download_ahead_blocks as BlockNumber { - trace!(target: "sync", "{}: Stalled download ({} vs {}), helping with downloading block bodies", peer_id, needed_numbers.first().unwrap(), head); - self.request_blocks(io, peer_id, true); - } else { - self.downloading_bodies.extend(needed_numbers.iter()); - replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, needed_numbers); - self.request_bodies(io, peer_id, needed_bodies); - } + replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, needed_bodies.clone()); + self.request_bodies(io, peer_id, needed_bodies); return; } - // check if need to download headers - let mut start = 0; - if !self.have_common_block { - // download backwards until common block is found 1 header at a time - let chain_info = io.chain().chain_info(); - start = match self.last_imported_block { - Some(n) => n, - None => chain_info.best_block_number, - }; - if !self.headers.is_empty() { - start = min(start, self.headers.range_iter().next().unwrap().0 - 1); - } - if start == 0 { - self.have_common_block = true; //reached genesis - self.last_imported_hash = Some(chain_info.genesis_hash); - self.last_imported_block = Some(0); - } - } - if self.have_common_block { - let mut headers: Vec = Vec::new(); - let mut prev = self.current_base_block() + 1; - let head = self.headers.range_iter().next().map(|(h, _)| h); - for (next, ref items) in self.headers.range_iter() { - if !headers.is_empty() { - break; - } - if next <= prev { - prev = next + items.len() as BlockNumber; - continue; - } - let mut block = prev; - while block < next && headers.len() < MAX_HEADERS_TO_REQUEST { - if ignore_others || !self.downloading_headers.contains(&(block as BlockNumber)) { - headers.push(block as BlockNumber); - } - block += 1; - } - prev = next + items.len() as BlockNumber; - } - - if !headers.is_empty() { - start = headers[0]; - if head.is_some() && start > head.unwrap() && start - head.unwrap() > self.max_download_ahead_blocks as BlockNumber { - trace!(target: "sync", "{}: Stalled download ({} vs {}), helping with downloading headers", peer_id, start, head.unwrap()); - self.request_blocks(io, peer_id, true); - return; - } - let count = headers.len(); - self.downloading_headers.extend(headers.iter()); - replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, headers); - assert!(!self.headers.have_item(&start)); - self.request_headers_by_number(io, peer_id, start, count, 0, false); - } - } - else { - // continue search for common block - self.downloading_headers.insert(start); - self.request_headers_by_number(io, peer_id, start, 1, 0, false); + // find subchain to download + if let Some((h, count)) = self.blocks.needed_headers(MAX_HEADERS_TO_REQUEST, ignore_others) { + replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, vec![h.clone()]); + self.request_headers_by_hash(io, peer_id, &h, count, 0, false, PeerAsking::BlockHeaders); } } /// Clear all blocks/headers marked as being downloaded by a peer. fn clear_peer_download(&mut self, peer_id: PeerId) { let peer = self.peers.get_mut(&peer_id).unwrap(); - if let Some(hash) = peer.asking_hash.take() { - self.downloading_hashes.remove(&hash); - } - for b in &peer.asking_blocks { - self.downloading_headers.remove(b); - self.downloading_bodies.remove(b); + match peer.asking { + PeerAsking::BlockHeaders | PeerAsking::Heads => { + for b in &peer.asking_blocks { + self.blocks.clear_header_download(&b); + } + }, + PeerAsking::BlockBodies => { + for b in &peer.asking_blocks { + self.blocks.clear_body_download(&b); + } + }, + _ => (), } peer.asking_blocks.clear(); } /// Checks if there are blocks fully downloaded that can be imported into the blockchain and does the import. fn collect_blocks(&mut self, io: &mut SyncIo) { - if !self.have_common_block || self.headers.is_empty() || self.bodies.is_empty() { - return; - } - let mut restart = false; - // merge headers and bodies - { - let headers = self.headers.range_iter().next().unwrap(); - let bodies = self.bodies.range_iter().next().unwrap(); - if headers.0 != bodies.0 || headers.0 > self.current_base_block() + 1 { - return; + let mut imported = HashSet::new(); + let blocks = self.blocks.drain(); + let count = blocks.len(); + for block in blocks { + let number = BlockView::new(&block).header_view().number(); + let h = BlockView::new(&block).header_view().sha3(); + + // Perform basic block verification + if !Block::is_good(&block) { + debug!(target: "sync", "Bad block rlp {:?} : {:?}", h, block); + restart = true; + break; } - let count = min(headers.1.len(), bodies.1.len()); - let mut imported = 0; - for i in 0..count { - let mut block_rlp = RlpStream::new_list(3); - block_rlp.append_raw(&headers.1[i].data, 1); - let body = Rlp::new(&bodies.1[i]); - block_rlp.append_raw(body.at(0).as_raw(), 1); - block_rlp.append_raw(body.at(1).as_raw(), 1); - let h = &headers.1[i].hash; - - // Perform basic block verification - if !Block::is_good(block_rlp.as_raw()) { - debug!(target: "sync", "Bad block rlp {:?} : {:?}", h, block_rlp.as_raw()); + match io.chain().import_block(block) { + Err(Error::Import(ImportError::AlreadyInChain)) => { + trace!(target: "sync", "Block already in chain {:?}", h); + }, + Err(Error::Import(ImportError::AlreadyQueued)) => { + trace!(target: "sync", "Block already queued {:?}", h); + }, + Ok(_) => { + trace!(target: "sync", "Block queued {:?}", h); + self.last_imported_block = number; + self.last_imported_hash = h.clone(); + imported.insert(h.clone()); + }, + Err(Error::Block(BlockError::UnknownParent(_))) if self.state == SyncState::NewBlocks => { + trace!(target: "sync", "Unknown new block parent, restarting sync"); + break; + }, + Err(e) => { + debug!(target: "sync", "Bad block {:?} : {:?}", h, e); restart = true; break; } - - match io.chain().import_block(block_rlp.out()) { - Err(Error::Import(ImportError::AlreadyInChain)) => { - trace!(target: "sync", "Block already in chain {:?}", h); - self.last_imported_block = Some(headers.0 + i as BlockNumber); - self.last_imported_hash = Some(h.clone()); - }, - Err(Error::Import(ImportError::AlreadyQueued)) => { - trace!(target: "sync", "Block already queued {:?}", h); - self.last_imported_block = Some(headers.0 + i as BlockNumber); - self.last_imported_hash = Some(h.clone()); - }, - Ok(_) => { - trace!(target: "sync", "Block queued {:?}", h); - self.last_imported_block = Some(headers.0 + i as BlockNumber); - self.last_imported_hash = Some(h.clone()); - imported += 1; - }, - Err(e) => { - debug!(target: "sync", "Bad block {:?} : {:?}", h, e); - restart = true; - } - } } - trace!(target: "sync", "Imported {} of {}", imported, count); } + trace!(target: "sync", "Imported {} of {}", imported.len(), count); + self.imported_this_round = Some(self.imported_this_round.unwrap_or(0) + imported.len()); if restart { self.restart_on_bad_block(io); return; } - self.headers.remove_head(&(self.last_imported_block.unwrap() + 1)); - self.bodies.remove_head(&(self.last_imported_block.unwrap() + 1)); - - if self.headers.is_empty() { - assert!(self.bodies.is_empty()); - self.complete_sync(); + if self.blocks.is_empty() { + // complete sync round + trace!(target: "sync", "Sync round complete"); + self.restart(io); } } - /// Remove downloaded bocks/headers starting from specified number. - /// Used to recover from an error and re-download parts of the chain detected as bad. - fn remove_downloaded_blocks(&mut self, start: BlockNumber) { - let ids = self.header_ids.drain().filter(|&(_, v)| v < start).collect(); - self.header_ids = ids; - let hdrs = self.downloading_headers.drain().filter(|v| *v < start).collect(); - self.downloading_headers = hdrs; - let bodies = self.downloading_bodies.drain().filter(|v| *v < start).collect(); - self.downloading_bodies = bodies; - self.headers.remove_from(&start); - self.bodies.remove_from(&start); - } - /// Request headers from a peer by block hash - fn request_headers_by_hash(&mut self, sync: &mut SyncIo, peer_id: PeerId, h: &H256, count: usize, skip: usize, reverse: bool) { + fn request_headers_by_hash(&mut self, sync: &mut SyncIo, peer_id: PeerId, h: &H256, count: usize, skip: usize, reverse: bool, asking: PeerAsking) { trace!(target: "sync", "{} <- GetBlockHeaders: {} entries starting from {}", peer_id, count, h); let mut rlp = RlpStream::new_list(4); rlp.append(h); rlp.append(&count); rlp.append(&skip); rlp.append(&if reverse {1u32} else {0u32}); - self.send_request(sync, peer_id, PeerAsking::BlockHeaders, GET_BLOCK_HEADERS_PACKET, rlp.out()); - } - - /// Request headers from a peer by block number - fn request_headers_by_number(&mut self, sync: &mut SyncIo, peer_id: PeerId, n: BlockNumber, count: usize, skip: usize, reverse: bool) { - let mut rlp = RlpStream::new_list(4); - trace!(target: "sync", "{} <- GetBlockHeaders: {} entries starting from {}", peer_id, count, n); - rlp.append(&n); - rlp.append(&count); - rlp.append(&skip); - rlp.append(&if reverse {1u32} else {0u32}); - self.send_request(sync, peer_id, PeerAsking::BlockHeaders, GET_BLOCK_HEADERS_PACKET, rlp.out()); + self.send_request(sync, peer_id, asking, GET_BLOCK_HEADERS_PACKET, rlp.out()); } /// Request block bodies from a peer fn request_bodies(&mut self, sync: &mut SyncIo, peer_id: PeerId, hashes: Vec) { let mut rlp = RlpStream::new_list(hashes.len()); - trace!(target: "sync", "{} <- GetBlockBodies: {} entries", peer_id, hashes.len()); + trace!(target: "sync", "{} <- GetBlockBodies: {} entries starting from {:?}", peer_id, hashes.len(), hashes.first()); for h in hashes { rlp.append(&h); } @@ -915,13 +840,16 @@ impl ChainSync { } /// Reset peer status after request is complete. - fn reset_peer_asking(&mut self, peer_id: PeerId, asking: PeerAsking) { + fn reset_peer_asking(&mut self, peer_id: PeerId, asking: PeerAsking) -> bool { let peer = self.peers.get_mut(&peer_id).unwrap(); if peer.asking != asking { - warn!(target:"sync", "Asking {:?} while expected {:?}", peer.asking, asking); + trace!(target:"sync", "Asking {:?} while expected {:?}", peer.asking, asking); + peer.asking = PeerAsking::Nothing; + false } else { peer.asking = PeerAsking::Nothing; + true } } @@ -975,7 +903,7 @@ impl ChainSync { balance: chain.balance(a), }; let _ = self.miner.import_transactions(transactions, fetch_account); - Ok(()) + Ok(()) } /// Send Status message @@ -1004,10 +932,9 @@ impl ChainSync { trace!(target: "sync", "-> GetBlockHeaders (hash: {}, max: {}, skip: {}, reverse:{})", hash, max_headers, skip, reverse); match io.chain().block_header(BlockID::Hash(hash)) { Some(hdr) => From::from(HeaderView::new(&hdr).number()), - None => last + None => return Ok(Some((BLOCK_HEADERS_PACKET, RlpStream::new_list(0)))) //no such header, return nothing } - } - else { + } else { trace!(target: "sync", "-> GetBlockHeaders (number: {}, max: {}, skip: {}, reverse:{})", try!(r.val_at::(0)), max_headers, skip, reverse); try!(r.val_at(0)) }; @@ -1374,6 +1301,7 @@ mod tests { use ethcore::views::BlockView; use ethcore::header::*; use ethcore::client::*; + use ethcore::spec::Spec; use ethminer::{Miner, MinerService}; fn get_dummy_block(order: u32, parent_hash: H256) -> Bytes { @@ -1430,6 +1358,7 @@ mod tests { fn return_receipts() { let mut client = TestBlockChainClient::new(); let mut queue = VecDeque::new(); + let mut sync = dummy_sync_with_peer(H256::new(), &client); let mut io = TestIo::new(&mut client, &mut queue, None); let mut receipt_list = RlpStream::new_list(4); @@ -1449,16 +1378,80 @@ mod tests { // the length of two rlp-encoded receipts assert_eq!(603, rlp_result.unwrap().1.out().len()); - let mut sync = dummy_sync_with_peer(H256::new()); io.sender = Some(2usize); sync.on_packet(&mut io, 0usize, super::GET_RECEIPTS_PACKET, &receipts_request); assert_eq!(1, io.queue.len()); } + #[test] + fn return_block_headers() { + use ethcore::views::HeaderView; + fn make_hash_req(h: &H256, count: usize, skip: usize, reverse: bool) -> Bytes { + let mut rlp = RlpStream::new_list(4); + rlp.append(h); + rlp.append(&count); + rlp.append(&skip); + rlp.append(&if reverse {1u32} else {0u32}); + rlp.out() + } + + fn make_num_req(n: usize, count: usize, skip: usize, reverse: bool) -> Bytes { + let mut rlp = RlpStream::new_list(4); + rlp.append(&n); + rlp.append(&count); + rlp.append(&skip); + rlp.append(&if reverse {1u32} else {0u32}); + rlp.out() + } + fn to_header_vec(rlp: ::chain::RlpResponseResult) -> Vec { + Rlp::new(&rlp.unwrap().unwrap().1.out()).iter().map(|r| r.as_raw().to_vec()).collect() + } + + let mut client = TestBlockChainClient::new(); + client.add_blocks(100, EachBlockWith::Nothing); + let blocks: Vec<_> = (0 .. 100).map(|i| (&client as &BlockChainClient).block(BlockID::Number(i as BlockNumber)).unwrap()).collect(); + let headers: Vec<_> = blocks.iter().map(|b| Rlp::new(b).at(0).as_raw().to_vec()).collect(); + let hashes: Vec<_> = headers.iter().map(|h| HeaderView::new(h).sha3()).collect(); + + let mut queue = VecDeque::new(); + let io = TestIo::new(&mut client, &mut queue, None); + + let unknown: H256 = H256::new(); + let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&unknown, 1, 0, false))); + assert!(to_header_vec(result).is_empty()); + let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&unknown, 1, 0, true))); + assert!(to_header_vec(result).is_empty()); + + let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&hashes[2], 1, 0, true))); + assert_eq!(to_header_vec(result), vec![headers[2].clone()]); + + let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&hashes[2], 1, 0, false))); + assert_eq!(to_header_vec(result), vec![headers[2].clone()]); + + let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&hashes[50], 3, 5, false))); + assert_eq!(to_header_vec(result), vec![headers[50].clone(), headers[56].clone(), headers[62].clone()]); + + let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&hashes[50], 3, 5, true))); + assert_eq!(to_header_vec(result), vec![headers[50].clone(), headers[44].clone(), headers[38].clone()]); + + let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_num_req(2, 1, 0, true))); + assert_eq!(to_header_vec(result), vec![headers[2].clone()]); + + let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_num_req(2, 1, 0, false))); + assert_eq!(to_header_vec(result), vec![headers[2].clone()]); + + let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_num_req(50, 3, 5, false))); + assert_eq!(to_header_vec(result), vec![headers[50].clone(), headers[56].clone(), headers[62].clone()]); + + let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_num_req(50, 3, 5, true))); + assert_eq!(to_header_vec(result), vec![headers[50].clone(), headers[44].clone(), headers[38].clone()]); + } + #[test] fn return_nodes() { let mut client = TestBlockChainClient::new(); let mut queue = VecDeque::new(); + let mut sync = dummy_sync_with_peer(H256::new(), &client); let mut io = TestIo::new(&mut client, &mut queue, None); let mut node_list = RlpStream::new_list(3); @@ -1477,27 +1470,26 @@ mod tests { // the length of one rlp-encoded hashe assert_eq!(34, rlp_result.unwrap().1.out().len()); - let mut sync = dummy_sync_with_peer(H256::new()); io.sender = Some(2usize); sync.on_packet(&mut io, 0usize, super::GET_NODE_DATA_PACKET, &node_request); assert_eq!(1, io.queue.len()); } - fn dummy_sync_with_peer(peer_latest_hash: H256) -> ChainSync { - let mut sync = ChainSync::new(SyncConfig::default(), Arc::new(Miner::default())); + fn dummy_sync_with_peer(peer_latest_hash: H256, client: &BlockChainClient) -> ChainSync { + let mut sync = ChainSync::new(SyncConfig::default(), Miner::new(false, Spec::new_test()), client); sync.peers.insert(0, - PeerInfo { + PeerInfo { protocol_version: 0, genesis: H256::zero(), network_id: U256::zero(), latest_hash: peer_latest_hash, latest_number: None, - difficulty: U256::zero(), + difficulty: None, asking: PeerAsking::Nothing, - asking_blocks: Vec::::new(), + asking_blocks: Vec::new(), asking_hash: None, ask_time: 0f64, - }); + }); sync } @@ -1506,7 +1498,7 @@ mod tests { let mut client = TestBlockChainClient::new(); client.add_blocks(100, EachBlockWith::Uncle); let mut queue = VecDeque::new(); - let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(10)); + let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(10), &client); let chain_info = client.chain_info(); let io = TestIo::new(&mut client, &mut queue, None); @@ -1537,7 +1529,7 @@ mod tests { let mut client = TestBlockChainClient::new(); client.add_blocks(100, EachBlockWith::Uncle); let mut queue = VecDeque::new(); - let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); + let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); let chain_info = client.chain_info(); let mut io = TestIo::new(&mut client, &mut queue, None); @@ -1556,7 +1548,7 @@ mod tests { let mut client = TestBlockChainClient::new(); client.add_blocks(100, EachBlockWith::Uncle); let mut queue = VecDeque::new(); - let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); + let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); let chain_info = client.chain_info(); let mut io = TestIo::new(&mut client, &mut queue, None); let peer_count = sync.propagate_blocks(&chain_info, &mut io); @@ -1577,8 +1569,8 @@ mod tests { let block_data = get_dummy_block(11, client.chain_info().best_block_hash); let mut queue = VecDeque::new(); - let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); - sync.have_common_block = true; + let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); + //sync.have_common_block = true; let mut io = TestIo::new(&mut client, &mut queue, None); let block = UntrustedRlp::new(&block_data); @@ -1596,7 +1588,7 @@ mod tests { let block_data = get_dummy_blocks(11, client.chain_info().best_block_hash); let mut queue = VecDeque::new(); - let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); + let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); let mut io = TestIo::new(&mut client, &mut queue, None); let block = UntrustedRlp::new(&block_data); @@ -1611,7 +1603,7 @@ mod tests { let mut client = TestBlockChainClient::new(); client.add_blocks(10, EachBlockWith::Uncle); let mut queue = VecDeque::new(); - let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); + let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); let mut io = TestIo::new(&mut client, &mut queue, None); let empty_data = vec![]; @@ -1627,7 +1619,7 @@ mod tests { let mut client = TestBlockChainClient::new(); client.add_blocks(10, EachBlockWith::Uncle); let mut queue = VecDeque::new(); - let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); + let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); let mut io = TestIo::new(&mut client, &mut queue, None); let hashes_data = get_dummy_hashes(); @@ -1643,7 +1635,7 @@ mod tests { let mut client = TestBlockChainClient::new(); client.add_blocks(10, EachBlockWith::Uncle); let mut queue = VecDeque::new(); - let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); + let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); let mut io = TestIo::new(&mut client, &mut queue, None); let empty_hashes_data = vec![]; @@ -1661,7 +1653,7 @@ mod tests { let mut client = TestBlockChainClient::new(); client.add_blocks(100, EachBlockWith::Uncle); let mut queue = VecDeque::new(); - let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); + let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); let chain_info = client.chain_info(); let mut io = TestIo::new(&mut client, &mut queue, None); @@ -1679,7 +1671,7 @@ mod tests { let mut client = TestBlockChainClient::new(); client.add_blocks(100, EachBlockWith::Uncle); let mut queue = VecDeque::new(); - let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); + let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); let chain_info = client.chain_info(); let mut io = TestIo::new(&mut client, &mut queue, None); @@ -1697,7 +1689,7 @@ mod tests { client.add_blocks(98, EachBlockWith::Uncle); client.add_blocks(1, EachBlockWith::UncleAndTransaction); client.add_blocks(1, EachBlockWith::Transaction); - let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); + let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); let good_blocks = vec![client.block_hash_delta_minus(2)]; let retracted_blocks = vec![client.block_hash_delta_minus(1)]; @@ -1744,7 +1736,7 @@ mod tests { client.add_blocks(98, EachBlockWith::Uncle); client.add_blocks(1, EachBlockWith::UncleAndTransaction); client.add_blocks(1, EachBlockWith::Transaction); - let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); + let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); let good_blocks = vec![client.block_hash_delta_minus(2)]; let retracted_blocks = vec![client.block_hash_delta_minus(1)]; @@ -1763,53 +1755,4 @@ mod tests { assert_eq!(status.transactions_in_pending_queue, 0); assert_eq!(status.transactions_in_future_queue, 0); } - - #[test] - fn returns_requested_block_headers() { - let mut client = TestBlockChainClient::new(); - client.add_blocks(100, EachBlockWith::Uncle); - let mut queue = VecDeque::new(); - let io = TestIo::new(&mut client, &mut queue, None); - - let mut rlp = RlpStream::new_list(4); - rlp.append(&0u64); - rlp.append(&10u64); - rlp.append(&0u64); - rlp.append(&0u64); - let data = rlp.out(); - - let response = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&data)); - - assert!(response.is_ok()); - let (_, rlp_stream) = response.unwrap().unwrap(); - let response_data = rlp_stream.out(); - let rlp = UntrustedRlp::new(&response_data); - assert!(rlp.at(0).is_ok()); - assert!(rlp.at(9).is_ok()); - } - - #[test] - fn returns_requested_block_headers_reverse() { - let mut client = TestBlockChainClient::new(); - client.add_blocks(100, EachBlockWith::Uncle); - let mut queue = VecDeque::new(); - let io = TestIo::new(&mut client, &mut queue, None); - - let mut rlp = RlpStream::new_list(4); - rlp.append(&15u64); - rlp.append(&15u64); - rlp.append(&0u64); - rlp.append(&1u64); - let data = rlp.out(); - - let response = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&data)); - - assert!(response.is_ok()); - let (_, rlp_stream) = response.unwrap().unwrap(); - let response_data = rlp_stream.out(); - let rlp = UntrustedRlp::new(&response_data); - assert!(rlp.at(0).is_ok()); - assert!(rlp.at(14).is_ok()); - assert!(!rlp.at(15).is_ok()); - } } diff --git a/sync/src/lib.rs b/sync/src/lib.rs index f1b1bb0b2..41faea204 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -74,8 +74,8 @@ use io::NetSyncIo; use chain::ChainSync; mod chain; +mod blocks; mod io; -mod range_collection; #[cfg(test)] mod tests; @@ -116,9 +116,10 @@ pub use self::chain::{SyncStatus, SyncState}; impl EthSync { /// Creates and register protocol with the network service pub fn register(service: &mut NetworkService, config: SyncConfig, chain: Arc, miner: Arc) -> Arc { + let sync = ChainSync::new(config, miner, chain.deref()); let sync = Arc::new(EthSync { chain: chain, - sync: RwLock::new(ChainSync::new(config, miner)), + sync: RwLock::new(sync), }); service.register_protocol(sync.clone(), "eth", &[62u8, 63u8]).expect("Error registering eth protocol handler"); sync diff --git a/sync/src/range_collection.rs b/sync/src/range_collection.rs deleted file mode 100644 index 6b57f0a4b..000000000 --- a/sync/src/range_collection.rs +++ /dev/null @@ -1,317 +0,0 @@ -// Copyright 2015, 2016 Ethcore (UK) Ltd. -// This file is part of Parity. - -// Parity is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity. If not, see . - -/// This module defines a trait for a collection of ranged values and an implementation -/// for this trait over sorted vector. - -use std::ops::{Add, Sub, Range}; - -pub trait ToUsize { - fn to_usize(&self) -> usize; -} - -pub trait FromUsize { - fn from_usize(s: usize) -> Self; -} - -/// A key-value collection orderd by key with sequential key-value pairs grouped together. -/// Such group is called a range. -/// E.g. a set of collection of 5 pairs {1, a}, {2, b}, {10, x}, {11, y}, {12, z} will be grouped into two ranges: {1, [a,b]}, {10, [x,y,z]} -pub trait RangeCollection { - /// Check if the given key is present in the collection. - fn have_item(&self, key: &K) -> bool; - /// Get value by key. - fn find_item(&self, key: &K) -> Option<&V>; - /// Get a range of keys from `key` till the end of the range that has `key` - /// Returns an empty range is key does not exist. - fn get_tail(&mut self, key: &K) -> Range; - /// Remove all elements < `start` in the range that contains `start` - 1 - fn remove_head(&mut self, start: &K); - /// Remove all elements >= `start` in the range that contains `start` - fn remove_tail(&mut self, start: &K); - /// Remove all elements >= `start` - fn remove_from(&mut self, start: &K); - /// Remove all elements >= `tail` - fn insert_item(&mut self, key: K, value: V); - /// Get an iterator over ranges - fn range_iter(& self) -> RangeIterator; -} - -/// Range iterator. For each range yelds a key for the first element of the range and a vector of values. -pub struct RangeIterator<'c, K:'c, V:'c> { - range: usize, - collection: &'c Vec<(K, Vec)> -} - -impl<'c, K:'c, V:'c> Iterator for RangeIterator<'c, K, V> where K: Add + FromUsize + ToUsize + Copy { - type Item = (K, &'c [V]); - // The 'Iterator' trait only requires the 'next' method to be defined. The - // return type is 'Option', 'None' is returned when the 'Iterator' is - // over, otherwise the next value is returned wrapped in 'Some' - fn next(&mut self) -> Option<(K, &'c [V])> { - if self.range > 0 { - self.range -= 1; - } - else { - return None; - } - match self.collection.get(self.range) { - Some(&(ref k, ref vec)) => { - Some((*k, vec)) - }, - None => None - } - } -} - -impl RangeCollection for Vec<(K, Vec)> where K: Ord + PartialEq + Add + Sub + Copy + FromUsize + ToUsize { - fn range_iter(&self) -> RangeIterator { - RangeIterator { - range: self.len(), - collection: self - } - } - - fn have_item(&self, key: &K) -> bool { - match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { - Ok(_) => true, - Err(index) => match self.get(index) { - Some(&(ref k, ref v)) => k <= key && (*k + FromUsize::from_usize(v.len())) > *key, - _ => false - }, - } - } - - fn find_item(&self, key: &K) -> Option<&V> { - match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { - Ok(index) => self.get(index).unwrap().1.get(0), - Err(index) => match self.get(index) { - Some(&(ref k, ref v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => v.get((*key - *k).to_usize()), - _ => None - }, - } - } - - fn get_tail(&mut self, key: &K) -> Range { - let kv = *key; - match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { - Ok(index) => kv..(kv + FromUsize::from_usize(self[index].1.len())), - Err(index) => { - match self.get_mut(index) { - Some(&mut (ref k, ref mut v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => { - kv..(*k + FromUsize::from_usize(v.len())) - } - _ => kv..kv - } - }, - } - } - /// Remove element key and following elements in the same range - fn remove_tail(&mut self, key: &K) { - match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { - Ok(index) => { self.remove(index); }, - Err(index) =>{ - let mut empty = false; - match self.get_mut(index) { - Some(&mut (ref k, ref mut v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => { - v.truncate((*key - *k).to_usize()); - empty = v.is_empty(); - } - _ => {} - } - if empty { - self.remove(index); - } - }, - } - } - - /// Remove the element and all following it. - fn remove_from(&mut self, key: &K) { - match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { - Ok(index) => { self.drain(.. index + 1); }, - Err(index) =>{ - let mut empty = false; - match self.get_mut(index) { - Some(&mut (ref k, ref mut v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => { - v.truncate((*key - *k).to_usize()); - empty = v.is_empty(); - } - _ => {} - } - if empty { - self.drain(.. index + 1); - } else { - self.drain(.. index); - } - }, - } - } - - /// Remove range elements up to key - fn remove_head(&mut self, key: &K) { - if *key == FromUsize::from_usize(0) { - return - } - - let prev = *key - FromUsize::from_usize(1); - match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { - Ok(_) => { }, //start of range, do nothing. - Err(index) => { - let mut empty = false; - match self.get_mut(index) { - Some(&mut (ref mut k, ref mut v)) if *k <= prev && (*k + FromUsize::from_usize(v.len())) > prev => { - let tail = v.split_off((*key - *k).to_usize()); - empty = tail.is_empty(); - let removed = ::std::mem::replace(v, tail); - let new_k = *k + FromUsize::from_usize(removed.len()); - ::std::mem::replace(k, new_k); - } - _ => {} - } - if empty { - self.remove(index); - } - }, - } - } - - fn insert_item(&mut self, key: K, value: V) { - assert!(!self.have_item(&key)); - - // todo: fix warning - let lower = match self.binary_search_by(|&(k, _)| k.cmp(&key).reverse()) { - Ok(index) | Err(index) => index - }; - - let mut to_remove: Option = None; - if lower < self.len() && self[lower].0 + FromUsize::from_usize(self[lower].1.len()) == key { - // extend into existing chunk - self[lower].1.push(value); - } - else { - // insert a new chunk - let range: Vec = vec![value]; - self.insert(lower, (key, range)); - }; - if lower > 0 { - let next = lower - 1; - if next < self.len() - { - { - let (mut next, mut inserted) = self.split_at_mut(lower); - let mut next = next.last_mut().unwrap(); - let mut inserted = inserted.first_mut().unwrap(); - if next.0 == key + FromUsize::from_usize(1) - { - inserted.1.append(&mut next.1); - to_remove = Some(lower - 1); - } - } - - if let Some(r) = to_remove { - self.remove(r); - } - } - } - } -} - -#[test] -#[cfg_attr(feature="dev", allow(cyclomatic_complexity))] -fn test_range() { - use std::cmp::{Ordering}; - - let mut ranges: Vec<(u64, Vec)> = Vec::new(); - assert_eq!(ranges.range_iter().next(), None); - assert_eq!(ranges.find_item(&1), None); - assert!(!ranges.have_item(&1)); - assert_eq!(ranges.get_tail(&0), 0..0); - - ranges.insert_item(17, 'q'); - assert_eq!(ranges.range_iter().cmp(vec![(17, &['q'][..])]), Ordering::Equal); - assert_eq!(ranges.find_item(&17), Some(&'q')); - assert!(ranges.have_item(&17)); - assert_eq!(ranges.get_tail(&17), 17..18); - - ranges.insert_item(18, 'r'); - assert_eq!(ranges.range_iter().cmp(vec![(17, &['q', 'r'][..])]), Ordering::Equal); - assert_eq!(ranges.find_item(&18), Some(&'r')); - assert!(ranges.have_item(&18)); - assert_eq!(ranges.get_tail(&17), 17..19); - - ranges.insert_item(16, 'p'); - assert_eq!(ranges.range_iter().cmp(vec![(16, &['p', 'q', 'r'][..])]), Ordering::Equal); - assert_eq!(ranges.find_item(&16), Some(&'p')); - assert_eq!(ranges.find_item(&17), Some(&'q')); - assert_eq!(ranges.find_item(&18), Some(&'r')); - assert!(ranges.have_item(&16)); - assert_eq!(ranges.get_tail(&17), 17..19); - assert_eq!(ranges.get_tail(&16), 16..19); - - ranges.insert_item(2, 'b'); - assert_eq!(ranges.range_iter().cmp(vec![(2, &['b'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); - assert_eq!(ranges.find_item(&2), Some(&'b')); - - ranges.insert_item(3, 'c'); - ranges.insert_item(4, 'd'); - assert_eq!(ranges.get_tail(&3), 3..5); - assert_eq!(ranges.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); - - let mut r = ranges.clone(); - r.remove_head(&1); - assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); - r.remove_head(&2); - assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); - r.remove_head(&3); - assert_eq!(r.range_iter().cmp(vec![(3, &['c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); - r.remove_head(&10); - assert_eq!(r.range_iter().cmp(vec![(3, &['c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); - r.remove_head(&5); - assert_eq!(r.range_iter().cmp(vec![(16, &['p', 'q', 'r'][..])]), Ordering::Equal); - r.remove_head(&19); - assert_eq!(r.range_iter().next(), None); - - let mut r = ranges.clone(); - r.remove_tail(&20); - assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); - r.remove_tail(&17); - assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p'][..])]), Ordering::Equal); - r.remove_tail(&16); - assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..])]), Ordering::Equal); - r.remove_tail(&3); - assert_eq!(r.range_iter().cmp(vec![(2, &['b'][..])]), Ordering::Equal); - r.remove_tail(&2); - assert_eq!(r.range_iter().next(), None); - - let mut r = ranges.clone(); - r.remove_from(&20); - assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); - r.remove_from(&18); - assert!(!r.have_item(&18)); - assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q'][..])]), Ordering::Equal); - r.remove_from(&16); - assert!(!r.have_item(&16)); - assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..])]), Ordering::Equal); - r.remove_from(&3); - assert_eq!(r.range_iter().cmp(vec![(2, &['b'][..])]), Ordering::Equal); - r.remove_from(&1); - assert_eq!(r.range_iter().next(), None); - let mut r = ranges.clone(); - r.remove_from(&2); - assert_eq!(r.range_iter().next(), None); -} - diff --git a/sync/src/tests/chain.rs b/sync/src/tests/chain.rs index d89db4b27..2e3ec1f4c 100644 --- a/sync/src/tests/chain.rs +++ b/sync/src/tests/chain.rs @@ -30,6 +30,16 @@ fn two_peers() { assert_eq!(net.peer(0).chain.blocks.read().unwrap().deref(), net.peer(1).chain.blocks.read().unwrap().deref()); } +#[test] +fn long_chain() { + ::env_logger::init().ok(); + let mut net = TestNet::new(2); + net.peer_mut(1).chain.add_blocks(50000, EachBlockWith::Nothing); + net.sync(); + assert!(net.peer(0).chain.block(BlockID::Number(50000)).is_some()); + assert_eq!(net.peer(0).chain.blocks.read().unwrap().deref(), net.peer(1).chain.blocks.read().unwrap().deref()); +} + #[test] fn status_after_sync() { ::env_logger::init().ok(); @@ -47,7 +57,7 @@ fn takes_few_steps() { net.peer_mut(1).chain.add_blocks(100, EachBlockWith::Uncle); net.peer_mut(2).chain.add_blocks(100, EachBlockWith::Uncle); let total_steps = net.sync(); - assert!(total_steps < 7); + assert!(total_steps < 20); } #[test] @@ -79,6 +89,7 @@ fn forked() { // peer 1 has the best chain of 601 blocks let peer1_chain = net.peer(1).chain.numbers.read().unwrap().clone(); net.sync(); + assert_eq!(net.peer(0).chain.difficulty.read().unwrap().deref(), net.peer(1).chain.difficulty.read().unwrap().deref()); assert_eq!(net.peer(0).chain.numbers.read().unwrap().deref(), &peer1_chain); assert_eq!(net.peer(1).chain.numbers.read().unwrap().deref(), &peer1_chain); assert_eq!(net.peer(2).chain.numbers.read().unwrap().deref(), &peer1_chain); @@ -97,13 +108,13 @@ fn restart() { net.restart_peer(0); let status = net.peer(0).sync.status(); - assert_eq!(status.state, SyncState::NotSynced); + assert_eq!(status.state, SyncState::ChainHead); } #[test] fn status_empty() { let net = TestNet::new(2); - assert_eq!(net.peer(0).sync.status().state, SyncState::NotSynced); + assert_eq!(net.peer(0).sync.status().state, SyncState::Idle); } #[test] @@ -166,8 +177,17 @@ fn restart_on_malformed_block() { let mut net = TestNet::new(2); net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle); net.peer_mut(1).chain.corrupt_block(6); - net.sync_steps(10); + net.sync_steps(20); - assert_eq!(net.peer(0).chain.chain_info().best_block_number, 4); + assert_eq!(net.peer(0).chain.chain_info().best_block_number, 5); } +#[test] +fn restart_on_broken_chain() { + let mut net = TestNet::new(2); + net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle); + net.peer_mut(1).chain.corrupt_block_parent(6); + net.sync_steps(20); + + assert_eq!(net.peer(0).chain.chain_info().best_block_number, 5); +} diff --git a/sync/src/tests/helpers.rs b/sync/src/tests/helpers.rs index 461553924..d1ffde0f0 100644 --- a/sync/src/tests/helpers.rs +++ b/sync/src/tests/helpers.rs @@ -92,9 +92,11 @@ impl TestNet { started: false, }; for _ in 0..n { + let chain = TestBlockChainClient::new(); + let sync = ChainSync::new(SyncConfig::default(), Miner::new(false, Spec::new_test()), &chain); net.peers.push(TestPeer { - chain: TestBlockChainClient::new(), - sync: ChainSync::new(SyncConfig::default(), Miner::new(false, Spec::new_test())), + sync: sync, + chain: chain, queue: VecDeque::new(), }); }