diff --git a/sync/chain.rs b/sync/chain.rs deleted file mode 100644 index 3eaf69cb1..000000000 --- a/sync/chain.rs +++ /dev/null @@ -1,976 +0,0 @@ -/// -/// BlockChain synchronization strategy. -/// Syncs to peers and keeps up to date. -/// This implementation uses ethereum protocol v63 -/// -/// Syncing strategy. -/// -/// 1. A peer arrives with a total difficulty better than ours -/// 2. Find a common best block between our an peer chain. -/// Start with out best block and request headers from peer backwards until a common block is found -/// 3. Download headers and block bodies from peers in parallel. -/// As soon as a set of the blocks is fully downloaded at the head of the queue it is fed to the blockchain -/// 4. Maintain sync by handling NewBlocks/NewHashes messages -/// - -use util::*; -use std::mem::{replace}; -use views::{HeaderView}; -use header::{BlockNumber, Header as BlockHeader}; -use client::{BlockChainClient, BlockStatus}; -use sync::range_collection::{RangeCollection, ToUsize, FromUsize}; -use error::*; -use sync::io::SyncIo; -use std::option::Option; - -impl ToUsize for BlockNumber { - fn to_usize(&self) -> usize { - *self as usize - } -} - -impl FromUsize for BlockNumber { - fn from_usize(s: usize) -> BlockNumber { - s as BlockNumber - } -} - -type PacketDecodeError = DecoderError; - -const PROTOCOL_VERSION: u8 = 63u8; -const MAX_BODIES_TO_SEND: usize = 256; -const MAX_HEADERS_TO_SEND: usize = 512; -const MAX_NODE_DATA_TO_SEND: usize = 1024; -const MAX_RECEIPTS_TO_SEND: usize = 1024; -const MAX_HEADERS_TO_REQUEST: usize = 512; -const MAX_BODIES_TO_REQUEST: usize = 256; - -const STATUS_PACKET: u8 = 0x00; -const NEW_BLOCK_HASHES_PACKET: u8 = 0x01; -const TRANSACTIONS_PACKET: u8 = 0x02; -const GET_BLOCK_HEADERS_PACKET: u8 = 0x03; -const BLOCK_HEADERS_PACKET: u8 = 0x04; -const GET_BLOCK_BODIES_PACKET: u8 = 0x05; -const BLOCK_BODIES_PACKET: u8 = 0x06; -const NEW_BLOCK_PACKET: u8 = 0x07; - -const GET_NODE_DATA_PACKET: u8 = 0x0d; -const NODE_DATA_PACKET: u8 = 0x0e; -const GET_RECEIPTS_PACKET: u8 = 0x0f; -const RECEIPTS_PACKET: u8 = 0x10; - -const NETWORK_ID: U256 = ONE_U256; //TODO: get this from parent - -struct Header { - /// Header data - data: Bytes, - /// Block hash - hash: H256, - /// Parent hash - parent: H256, -} - -/// Used to identify header by transactions and uncles hashes -#[derive(Eq, PartialEq, Hash)] -struct HeaderId { - transactions_root: H256, - uncles: H256 -} - -#[derive(Copy, Clone, Eq, PartialEq, Debug)] -/// Sync state -pub enum SyncState { - /// Initial chain sync has not started yet - NotSynced, - /// Initial chain sync complete. Waiting for new packets - Idle, - /// Block downloading paused. Waiting for block queue to process blocks and free some space - Waiting, - /// Downloading blocks - Blocks, - /// Downloading blocks learned from NewHashes packet - NewBlocks, -} - -/// Syncing status and statistics -pub struct SyncStatus { - /// State - pub state: SyncState, - /// Syncing protocol version. That's the maximum protocol version we connect to. - pub protocol_version: u8, - /// BlockChain height for the moment the sync started. - pub start_block_number: BlockNumber, - /// Last fully downloaded and imported block number (if any). - pub last_imported_block_number: Option, - /// Highest block number in the download queue (if any). - pub highest_block_number: Option, - /// Total number of blocks for the sync process. - pub blocks_total: BlockNumber, - /// Number of blocks downloaded so far. - pub blocks_received: BlockNumber, - /// Total number of connected peers - pub num_peers: usize, - /// Total number of active peers - pub num_active_peers: usize, -} - -#[derive(PartialEq, Eq, Debug)] -/// Peer data type requested -enum PeerAsking { - Nothing, - BlockHeaders, - BlockBodies, -} - -/// Syncing peer information -struct PeerInfo { - /// eth protocol version - protocol_version: u32, - /// Peer chain genesis hash - genesis: H256, - /// Peer network id - network_id: U256, - /// Peer best block hash - latest: H256, - /// Peer total difficulty - difficulty: U256, - /// Type of data currenty being requested from peer. - asking: PeerAsking, - /// A set of block numbers being requested - asking_blocks: Vec, -} - -/// Blockchain sync handler. -/// See module documentation for more details. -pub struct ChainSync { - /// Sync state - state: SyncState, - /// Last block number for the start of sync - starting_block: BlockNumber, - /// Highest block number seen - highest_block: Option, - /// Set of block header numbers being downloaded - downloading_headers: HashSet, - /// Set of block body numbers being downloaded - downloading_bodies: HashSet, - /// Downloaded headers. - headers: Vec<(BlockNumber, Vec
)>, //TODO: use BTreeMap once range API is sable. For now it is a vector sorted in descending order - /// Downloaded bodies - bodies: Vec<(BlockNumber, Vec)>, //TODO: use BTreeMap once range API is sable. For now it is a vector sorted in descending order - /// Peer info - peers: HashMap, - /// Used to map body to header - header_ids: HashMap, - /// Last impoted block number - last_imported_block: Option, - /// Last impoted block hash - last_imported_hash: Option, - /// Syncing total difficulty - syncing_difficulty: U256, - /// True if common block for our and remote chain has been found - have_common_block: bool, -} - - -impl ChainSync { - /// Create a new instance of syncing strategy. - pub fn new() -> ChainSync { - ChainSync { - state: SyncState::NotSynced, - starting_block: 0, - highest_block: None, - downloading_headers: HashSet::new(), - downloading_bodies: HashSet::new(), - headers: Vec::new(), - bodies: Vec::new(), - peers: HashMap::new(), - header_ids: HashMap::new(), - last_imported_block: None, - last_imported_hash: None, - syncing_difficulty: U256::from(0u64), - have_common_block: false, - } - } - - /// @returns Synchonization status - pub fn status(&self) -> SyncStatus { - SyncStatus { - state: self.state.clone(), - protocol_version: 63, - start_block_number: self.starting_block, - last_imported_block_number: self.last_imported_block, - highest_block_number: self.highest_block, - blocks_received: match self.last_imported_block { None => 0, Some(x) => x - self.starting_block }, - blocks_total: match self.highest_block { None => 0, Some(x) => x - self.starting_block }, - num_peers: self.peers.len(), - num_active_peers: self.peers.values().filter(|p| p.asking != PeerAsking::Nothing).count(), - } - } - - /// Abort all sync activity - pub fn abort(&mut self, io: &mut SyncIo) { - self.restart(io); - self.peers.clear(); - } - - /// Rest sync. Clear all downloaded data but keep the queue - fn reset(&mut self) { - self.downloading_headers.clear(); - self.downloading_bodies.clear(); - self.headers.clear(); - self.bodies.clear(); - for (_, ref mut p) in &mut self.peers { - p.asking_blocks.clear(); - } - self.header_ids.clear(); - self.syncing_difficulty = From::from(0u64); - self.state = SyncState::Idle; - } - - /// Restart sync - pub fn restart(&mut self, io: &mut SyncIo) { - self.reset(); - self.last_imported_block = None; - self.last_imported_hash = None; - self.starting_block = 0; - self.highest_block = None; - self.have_common_block = false; - io.chain().clear_queue(); - self.starting_block = io.chain().chain_info().best_block_number; - self.state = SyncState::NotSynced; - } - - /// Called by peer to report status - fn on_peer_status(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { - let peer = PeerInfo { - protocol_version: try!(r.val_at(0)), - network_id: try!(r.val_at(1)), - difficulty: try!(r.val_at(2)), - latest: try!(r.val_at(3)), - genesis: try!(r.val_at(4)), - asking: PeerAsking::Nothing, - asking_blocks: Vec::new(), - }; - - trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest, peer.genesis); - - let chain_info = io.chain().chain_info(); - if peer.genesis != chain_info.genesis_hash { - io.disable_peer(peer_id); - trace!(target: "sync", "Peer {} genesis hash not matched", peer_id); - return Ok(()); - } - if peer.network_id != NETWORK_ID { - io.disable_peer(peer_id); - trace!(target: "sync", "Peer {} network id not matched", peer_id); - return Ok(()); - } - - let old = self.peers.insert(peer_id.clone(), peer); - if old.is_some() { - panic!("ChainSync: new peer already exists"); - } - info!(target: "sync", "Connected {}:{}", peer_id, io.peer_info(peer_id)); - self.sync_peer(io, peer_id, false); - Ok(()) - } - - #[allow(cyclomatic_complexity)] - /// Called by peer once it has new block headers during sync - fn on_peer_block_headers(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { - self.reset_peer_asking(peer_id, PeerAsking::BlockHeaders); - let item_count = r.item_count(); - trace!(target: "sync", "{} -> BlockHeaders ({} entries)", peer_id, item_count); - self.clear_peer_download(peer_id); - if self.state != SyncState::Blocks && self.state != SyncState::NewBlocks && self.state != SyncState::Waiting { - trace!(target: "sync", "Ignored unexpected block headers"); - return Ok(()); - } - if self.state == SyncState::Waiting { - trace!(target: "sync", "Ignored block headers while waiting"); - return Ok(()); - } - - for i in 0..item_count { - let info: BlockHeader = try!(r.val_at(i)); - let number = BlockNumber::from(info.number); - if number <= self.current_base_block() || self.headers.have_item(&number) { - trace!(target: "sync", "Skipping existing block header"); - continue; - } - - if self.highest_block == None || number > self.highest_block.unwrap() { - self.highest_block = Some(number); - } - let hash = info.hash(); - match io.chain().block_status(&hash) { - BlockStatus::InChain => { - self.have_common_block = true; - self.last_imported_block = Some(number); - self.last_imported_hash = Some(hash.clone()); - trace!(target: "sync", "Found common header {} ({})", number, hash); - }, - _ => { - if self.have_common_block { - //validate chain - let base_hash = self.last_imported_hash.clone().unwrap(); - if self.have_common_block && number == self.current_base_block() + 1 && info.parent_hash != base_hash { - // TODO: lower peer rating - debug!(target: "sync", "Mismatched block header {} {}", number, hash); - continue; - } - if self.headers.find_item(&(number - 1)).map_or(false, |p| p.hash != info.parent_hash) { - // mismatching parent id, delete the previous block and don't add this one - // TODO: lower peer rating - debug!(target: "sync", "Mismatched block header {} {}", number, hash); - self.remove_downloaded_blocks(number - 1); - continue; - } - if self.headers.find_item(&(number + 1)).map_or(false, |p| p.parent != hash) { - // mismatching parent id for the next block, clear following headers - debug!(target: "sync", "Mismatched block header {}", number + 1); - self.remove_downloaded_blocks(number + 1); - } - } - let hdr = Header { - data: try!(r.at(i)).as_raw().to_vec(), - hash: hash.clone(), - parent: info.parent_hash, - }; - self.headers.insert_item(number, hdr); - let header_id = HeaderId { - transactions_root: info.transactions_root, - uncles: info.uncles_hash - }; - trace!(target: "sync", "Got header {} ({})", number, hash); - if header_id.transactions_root == rlp::SHA3_NULL_RLP && header_id.uncles == rlp::SHA3_EMPTY_LIST_RLP { - //empty body, just mark as downloaded - let mut body_stream = RlpStream::new_list(2); - body_stream.append_raw(&rlp::NULL_RLP, 1); - body_stream.append_raw(&rlp::EMPTY_LIST_RLP, 1); - self.bodies.insert_item(number, body_stream.out()); - } - else { - self.header_ids.insert(header_id, number); - } - } - } - } - self.collect_blocks(io); - self.continue_sync(io); - Ok(()) - } - - /// Called by peer once it has new block bodies - fn on_peer_block_bodies(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { - use util::triehash::ordered_trie_root; - self.reset_peer_asking(peer_id, PeerAsking::BlockBodies); - let item_count = r.item_count(); - trace!(target: "sync", "{} -> BlockBodies ({} entries)", peer_id, item_count); - self.clear_peer_download(peer_id); - if self.state != SyncState::Blocks && self.state != SyncState::NewBlocks && self.state != SyncState::Waiting { - trace!(target: "sync", "Ignored unexpected block bodies"); - return Ok(()); - } - if self.state == SyncState::Waiting { - trace!(target: "sync", "Ignored block bodies while waiting"); - return Ok(()); - } - for i in 0..item_count { - let body = try!(r.at(i)); - let tx = try!(body.at(0)); - let tx_root = ordered_trie_root(tx.iter().map(|r| r.as_raw().to_vec()).collect()); //TODO: get rid of vectors here - let uncles = try!(body.at(1)).as_raw().sha3(); - let header_id = HeaderId { - transactions_root: tx_root, - uncles: uncles - }; - match self.header_ids.get(&header_id).cloned() { - Some(n) => { - self.header_ids.remove(&header_id); - self.bodies.insert_item(n, body.as_raw().to_vec()); - trace!(target: "sync", "Got body {}", n); - } - None => { - debug!(target: "sync", "Ignored unknown block body"); - } - } - } - self.collect_blocks(io); - self.continue_sync(io); - Ok(()) - } - - /// Called by peer once it has new block bodies - fn on_peer_new_block(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { - let block_rlp = try!(r.at(0)); - let header_rlp = try!(block_rlp.at(0)); - let h = header_rlp.as_raw().sha3(); - - trace!(target: "sync", "{} -> NewBlock ({})", peer_id, h); - let header_view = HeaderView::new(header_rlp.as_raw()); - // TODO: Decompose block and add to self.headers and self.bodies instead - if header_view.number() == From::from(self.current_base_block() + 1) { - match io.chain().import_block(block_rlp.as_raw().to_vec()) { - Err(ImportError::AlreadyInChain) => { - trace!(target: "sync", "New block already in chain {:?}", h); - }, - Err(ImportError::AlreadyQueued) => { - trace!(target: "sync", "New block already queued {:?}", h); - }, - Ok(_) => { - trace!(target: "sync", "New block queued {:?}", h); - }, - Err(e) => { - debug!(target: "sync", "Bad new block {:?} : {:?}", h, e); - io.disable_peer(peer_id); - } - }; - } - else { - trace!(target: "sync", "New block unknown {:?}", h); - //TODO: handle too many unknown blocks - let difficulty: U256 = try!(r.val_at(1)); - let peer_difficulty = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").difficulty; - if difficulty > peer_difficulty { - trace!(target: "sync", "Received block {:?} with no known parent. Peer needs syncing...", h); - { - let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); - peer.latest = header_view.sha3(); - } - self.sync_peer(io, peer_id, true); - } - } - Ok(()) - } - - /// Handles NewHashes packet. Initiates headers download for any unknown hashes. - fn on_peer_new_hashes(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { - if self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").asking != PeerAsking::Nothing { - trace!(target: "sync", "Ignoring new hashes since we're already downloading."); - return Ok(()); - } - trace!(target: "sync", "{} -> NewHashes ({} entries)", peer_id, r.item_count()); - let hashes = r.iter().map(|item| (item.val_at::(0), item.val_at::(1))); - let mut max_height: U256 = From::from(0); - for (rh, rd) in hashes { - let h = try!(rh); - let d = try!(rd); - match io.chain().block_status(&h) { - BlockStatus::InChain => { - trace!(target: "sync", "New block hash already in chain {:?}", h); - }, - BlockStatus::Queued => { - trace!(target: "sync", "New hash block already queued {:?}", h); - }, - BlockStatus::Unknown => { - trace!(target: "sync", "New unknown block hash {:?}", h); - if d > max_height { - let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); - peer.latest = h.clone(); - max_height = d; - } - }, - BlockStatus::Bad =>{ - debug!(target: "sync", "Bad new block hash {:?}", h); - io.disable_peer(peer_id); - return Ok(()); - } - } - }; - if max_height != x!(0) { - self.sync_peer(io, peer_id, true); - } - Ok(()) - } - - /// Called by peer when it is disconnecting - pub fn on_peer_aborting(&mut self, io: &mut SyncIo, peer: PeerId) { - trace!(target: "sync", "== Disconnecting {}", peer); - if self.peers.contains_key(&peer) { - info!(target: "sync", "Disconnected {}:{}", peer, io.peer_info(peer)); - self.clear_peer_download(peer); - self.peers.remove(&peer); - self.continue_sync(io); - } - } - - /// Called when a new peer is connected - pub fn on_peer_connected(&mut self, io: &mut SyncIo, peer: PeerId) { - trace!(target: "sync", "== Connected {}", peer); - self.send_status(io, peer); - } - - /// Resume downloading - fn continue_sync(&mut self, io: &mut SyncIo) { - let mut peers: Vec<(PeerId, U256)> = self.peers.iter().map(|(k, p)| (*k, p.difficulty)).collect(); - peers.sort_by(|&(_, d1), &(_, d2)| d1.cmp(&d2).reverse()); //TODO: sort by rating - for (p, _) in peers { - self.sync_peer(io, p, false); - } - } - - /// Called after all blocks have been donloaded - fn complete_sync(&mut self) { - trace!(target: "sync", "Sync complete"); - self.reset(); - self.state = SyncState::Idle; - } - - /// Enter waiting state - fn pause_sync(&mut self) { - trace!(target: "sync", "Block queue full, pausing sync"); - self.state = SyncState::Waiting; - } - - /// Find something to do for a peer. Called for a new peer or when a peer is done with it's task. - fn sync_peer(&mut self, io: &mut SyncIo, peer_id: PeerId, force: bool) { - let (peer_latest, peer_difficulty) = { - let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); - if peer.asking != PeerAsking::Nothing { - return; - } - if self.state == SyncState::Waiting { - trace!(target: "sync", "Waiting for block queue"); - return; - } - (peer.latest.clone(), peer.difficulty.clone()) - }; - - let td = io.chain().chain_info().pending_total_difficulty; - let syncing_difficulty = max(self.syncing_difficulty, td); - if force || peer_difficulty > syncing_difficulty { - // start sync - self.syncing_difficulty = peer_difficulty; - if self.state == SyncState::Idle || self.state == SyncState::NotSynced { - self.state = SyncState::Blocks; - } - trace!(target: "sync", "Starting sync with better chain"); - self.request_headers_by_hash(io, peer_id, &peer_latest, 1, 0, false); - } - else if self.state == SyncState::Blocks { - self.request_blocks(io, peer_id); - } - } - - fn current_base_block(&self) -> BlockNumber { - match self.last_imported_block { None => 0, Some(x) => x } - } - - /// Find some headers or blocks to download for a peer. - fn request_blocks(&mut self, io: &mut SyncIo, peer_id: PeerId) { - self.clear_peer_download(peer_id); - - if io.chain().queue_info().full { - self.pause_sync(); - return; - } - - // check to see if we need to download any block bodies first - let mut needed_bodies: Vec = Vec::new(); - let mut needed_numbers: Vec = Vec::new(); - - if self.have_common_block && !self.headers.is_empty() && self.headers.range_iter().next().unwrap().0 == self.current_base_block() + 1 { - for (start, ref items) in self.headers.range_iter() { - if needed_bodies.len() > MAX_BODIES_TO_REQUEST { - break; - } - let mut index: BlockNumber = 0; - while index != items.len() as BlockNumber && needed_bodies.len() < MAX_BODIES_TO_REQUEST { - let block = start + index; - if !self.downloading_bodies.contains(&block) && !self.bodies.have_item(&block) { - needed_bodies.push(items[index as usize].hash.clone()); - needed_numbers.push(block); - self.downloading_bodies.insert(block); - } - index += 1; - } - } - } - if !needed_bodies.is_empty() { - replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, needed_numbers); - self.request_bodies(io, peer_id, needed_bodies); - } - else { - // check if need to download headers - let mut start = 0usize; - if !self.have_common_block { - // download backwards until common block is found 1 header at a time - let chain_info = io.chain().chain_info(); - start = chain_info.best_block_number as usize; - if !self.headers.is_empty() { - start = min(start, self.headers.range_iter().next().unwrap().0 as usize - 1); - } - if start == 0 { - self.have_common_block = true; //reached genesis - self.last_imported_hash = Some(chain_info.genesis_hash); - } - } - if self.have_common_block { - let mut headers: Vec = Vec::new(); - let mut prev = self.current_base_block() + 1; - for (next, ref items) in self.headers.range_iter() { - if !headers.is_empty() { - break; - } - if next <= prev { - prev = next + items.len() as BlockNumber; - continue; - } - let mut block = prev; - while block < next && headers.len() <= MAX_HEADERS_TO_REQUEST { - if !self.downloading_headers.contains(&(block as BlockNumber)) { - headers.push(block as BlockNumber); - self.downloading_headers.insert(block as BlockNumber); - } - block += 1; - } - prev = next + items.len() as BlockNumber; - } - - if !headers.is_empty() { - start = headers[0] as usize; - let count = headers.len(); - replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, headers); - assert!(!self.headers.have_item(&(start as BlockNumber))); - self.request_headers_by_number(io, peer_id, start as BlockNumber, count, 0, false); - } - } - else { - self.request_headers_by_number(io, peer_id, start as BlockNumber, 1, 0, false); - } - } - } - - /// Clear all blocks/headers marked as being downloaded by a peer. - fn clear_peer_download(&mut self, peer_id: PeerId) { - let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); - for b in &peer.asking_blocks { - self.downloading_headers.remove(&b); - self.downloading_bodies.remove(&b); - } - peer.asking_blocks.clear(); - } - - /// Checks if there are blocks fully downloaded that can be imported into the blockchain and does the import. - fn collect_blocks(&mut self, io: &mut SyncIo) { - if !self.have_common_block || self.headers.is_empty() || self.bodies.is_empty() { - return; - } - - let mut restart = false; - // merge headers and bodies - { - let headers = self.headers.range_iter().next().unwrap(); - let bodies = self.bodies.range_iter().next().unwrap(); - if headers.0 != bodies.0 || headers.0 != self.current_base_block() + 1 { - return; - } - - let count = min(headers.1.len(), bodies.1.len()); - let mut imported = 0; - for i in 0..count { - let mut block_rlp = RlpStream::new_list(3); - block_rlp.append_raw(&headers.1[i].data, 1); - let body = Rlp::new(&bodies.1[i]); - block_rlp.append_raw(body.at(0).as_raw(), 1); - block_rlp.append_raw(body.at(1).as_raw(), 1); - let h = &headers.1[i].hash; - match io.chain().import_block(block_rlp.out()) { - Err(ImportError::AlreadyInChain) => { - trace!(target: "sync", "Block already in chain {:?}", h); - self.last_imported_block = Some(headers.0 + i as BlockNumber); - self.last_imported_hash = Some(h.clone()); - }, - Err(ImportError::AlreadyQueued) => { - trace!(target: "sync", "Block already queued {:?}", h); - self.last_imported_block = Some(headers.0 + i as BlockNumber); - self.last_imported_hash = Some(h.clone()); - }, - Ok(_) => { - trace!(target: "sync", "Block queued {:?}", h); - self.last_imported_block = Some(headers.0 + i as BlockNumber); - self.last_imported_hash = Some(h.clone()); - imported += 1; - }, - Err(e) => { - debug!(target: "sync", "Bad block {:?} : {:?}", h, e); - restart = true; - } - } - } - trace!(target: "sync", "Imported {} of {}", imported, count); - } - - if restart { - self.restart(io); - return; - } - - self.headers.remove_head(&(self.last_imported_block.unwrap() + 1)); - self.bodies.remove_head(&(self.last_imported_block.unwrap() + 1)); - - if self.headers.is_empty() { - assert!(self.bodies.is_empty()); - self.complete_sync(); - } - } - - /// Remove downloaded bocks/headers starting from specified number. - /// Used to recover from an error and re-download parts of the chain detected as bad. - fn remove_downloaded_blocks(&mut self, start: BlockNumber) { - for n in self.headers.get_tail(&start) { - if let Some(ref header_data) = self.headers.find_item(&n) { - let header_to_delete = HeaderView::new(&header_data.data); - let header_id = HeaderId { - transactions_root: header_to_delete.transactions_root(), - uncles: header_to_delete.uncles_hash() - }; - self.header_ids.remove(&header_id); - } - self.downloading_bodies.remove(&n); - self.downloading_headers.remove(&n); - } - self.headers.remove_tail(&start); - self.bodies.remove_tail(&start); - } - - /// Request headers from a peer by block hash - fn request_headers_by_hash(&mut self, sync: &mut SyncIo, peer_id: PeerId, h: &H256, count: usize, skip: usize, reverse: bool) { - trace!(target: "sync", "{} <- GetBlockHeaders: {} entries starting from {}", peer_id, count, h); - let mut rlp = RlpStream::new_list(4); - rlp.append(h); - rlp.append(&count); - rlp.append(&skip); - rlp.append(&if reverse {1u32} else {0u32}); - self.send_request(sync, peer_id, PeerAsking::BlockHeaders, GET_BLOCK_HEADERS_PACKET, rlp.out()); - } - - /// Request headers from a peer by block number - fn request_headers_by_number(&mut self, sync: &mut SyncIo, peer_id: PeerId, n: BlockNumber, count: usize, skip: usize, reverse: bool) { - let mut rlp = RlpStream::new_list(4); - trace!(target: "sync", "{} <- GetBlockHeaders: {} entries starting from {}", peer_id, count, n); - rlp.append(&n); - rlp.append(&count); - rlp.append(&skip); - rlp.append(&if reverse {1u32} else {0u32}); - self.send_request(sync, peer_id, PeerAsking::BlockHeaders, GET_BLOCK_HEADERS_PACKET, rlp.out()); - } - - /// Request block bodies from a peer - fn request_bodies(&mut self, sync: &mut SyncIo, peer_id: PeerId, hashes: Vec) { - let mut rlp = RlpStream::new_list(hashes.len()); - trace!(target: "sync", "{} <- GetBlockBodies: {} entries", peer_id, hashes.len()); - for h in hashes { - rlp.append(&h); - } - self.send_request(sync, peer_id, PeerAsking::BlockBodies, GET_BLOCK_BODIES_PACKET, rlp.out()); - } - - /// Reset peer status after request is complete. - fn reset_peer_asking(&mut self, peer_id: PeerId, asking: PeerAsking) { - let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); - if peer.asking != asking { - warn!(target:"sync", "Asking {:?} while expected {:?}", peer.asking, asking); - } - else { - peer.asking = PeerAsking::Nothing; - } - } - - /// Generic request sender - fn send_request(&mut self, sync: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, packet_id: PacketId, packet: Bytes) { - { - let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); - if peer.asking != PeerAsking::Nothing { - warn!(target:"sync", "Asking {:?} while requesting {:?}", asking, peer.asking); - } - } - match sync.send(peer_id, packet_id, packet) { - Err(e) => { - warn!(target:"sync", "Error sending request: {:?}", e); - sync.disable_peer(peer_id); - self.on_peer_aborting(sync, peer_id); - } - Ok(_) => { - let mut peer = self.peers.get_mut(&peer_id).unwrap(); - peer.asking = asking; - } - } - } - - /// Called when peer sends us new transactions - fn on_peer_transactions(&mut self, _io: &mut SyncIo, _peer_id: PeerId, _r: &UntrustedRlp) -> Result<(), PacketDecodeError> { - Ok(()) - } - - /// Send Status message - fn send_status(&mut self, io: &mut SyncIo, peer_id: PeerId) { - let mut packet = RlpStream::new_list(5); - let chain = io.chain().chain_info(); - packet.append(&(PROTOCOL_VERSION as u32)); - packet.append(&NETWORK_ID); //TODO: network id - packet.append(&chain.total_difficulty); - packet.append(&chain.best_block_hash); - packet.append(&chain.genesis_hash); - //TODO: handle timeout for status request - if let Err(e) = io.send(peer_id, STATUS_PACKET, packet.out()) { - warn!(target:"sync", "Error sending status request: {:?}", e); - io.disable_peer(peer_id); - } - } - - /// Respond to GetBlockHeaders request - fn return_block_headers(&self, io: &mut SyncIo, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { - // Packet layout: - // [ block: { P , B_32 }, maxHeaders: P, skip: P, reverse: P in { 0 , 1 } ] - let max_headers: usize = try!(r.val_at(1)); - let skip: usize = try!(r.val_at(2)); - let reverse: bool = try!(r.val_at(3)); - let last = io.chain().chain_info().best_block_number; - let mut number = if try!(r.at(0)).size() == 32 { - // id is a hash - let hash: H256 = try!(r.val_at(0)); - trace!(target: "sync", "-> GetBlockHeaders (hash: {}, max: {}, skip: {}, reverse:{})", hash, max_headers, skip, reverse); - match io.chain().block_header(&hash) { - Some(hdr) => From::from(HeaderView::new(&hdr).number()), - None => last - } - } - else { - trace!(target: "sync", "-> GetBlockHeaders (number: {}, max: {}, skip: {}, reverse:{})", try!(r.val_at::(0)), max_headers, skip, reverse); - try!(r.val_at(0)) - }; - - if reverse { - number = min(last, number); - } else { - number = max(1, number); - } - let max_count = min(MAX_HEADERS_TO_SEND, max_headers); - let mut count = 0; - let mut data = Bytes::new(); - let inc = (skip + 1) as BlockNumber; - while number <= last && number > 0 && count < max_count { - if let Some(mut hdr) = io.chain().block_header_at(number) { - data.append(&mut hdr); - count += 1; - } - if reverse { - if number <= inc { - break; - } - number -= inc; - } - else { - number += inc; - } - } - let mut rlp = RlpStream::new_list(count as usize); - rlp.append_raw(&data, count as usize); - io.respond(BLOCK_HEADERS_PACKET, rlp.out()).unwrap_or_else(|e| - debug!(target: "sync", "Error sending headers: {:?}", e)); - trace!(target: "sync", "-> GetBlockHeaders: returned {} entries", count); - Ok(()) - } - - /// Respond to GetBlockBodies request - fn return_block_bodies(&self, io: &mut SyncIo, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { - let mut count = r.item_count(); - if count == 0 { - debug!(target: "sync", "Empty GetBlockBodies request, ignoring."); - return Ok(()); - } - trace!(target: "sync", "-> GetBlockBodies: {} entries", count); - count = min(count, MAX_BODIES_TO_SEND); - let mut added = 0usize; - let mut data = Bytes::new(); - for i in 0..count { - if let Some(mut hdr) = io.chain().block_body(&try!(r.val_at::(i))) { - data.append(&mut hdr); - added += 1; - } - } - let mut rlp = RlpStream::new_list(added); - rlp.append_raw(&data, added); - io.respond(BLOCK_BODIES_PACKET, rlp.out()).unwrap_or_else(|e| - debug!(target: "sync", "Error sending headers: {:?}", e)); - trace!(target: "sync", "-> GetBlockBodies: returned {} entries", added); - Ok(()) - } - - /// Respond to GetNodeData request - fn return_node_data(&self, io: &mut SyncIo, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { - let mut count = r.item_count(); - if count == 0 { - debug!(target: "sync", "Empty GetNodeData request, ignoring."); - return Ok(()); - } - count = min(count, MAX_NODE_DATA_TO_SEND); - let mut added = 0usize; - let mut data = Bytes::new(); - for i in 0..count { - if let Some(mut hdr) = io.chain().state_data(&try!(r.val_at::(i))) { - data.append(&mut hdr); - added += 1; - } - } - let mut rlp = RlpStream::new_list(added); - rlp.append_raw(&data, added); - io.respond(NODE_DATA_PACKET, rlp.out()).unwrap_or_else(|e| - debug!(target: "sync", "Error sending headers: {:?}", e)); - Ok(()) - } - - /// Respond to GetReceipts request - fn return_receipts(&self, io: &mut SyncIo, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { - let mut count = r.item_count(); - if count == 0 { - debug!(target: "sync", "Empty GetReceipts request, ignoring."); - return Ok(()); - } - count = min(count, MAX_RECEIPTS_TO_SEND); - let mut added = 0usize; - let mut data = Bytes::new(); - for i in 0..count { - if let Some(mut hdr) = io.chain().block_receipts(&try!(r.val_at::(i))) { - data.append(&mut hdr); - added += 1; - } - } - let mut rlp = RlpStream::new_list(added); - rlp.append_raw(&data, added); - io.respond(RECEIPTS_PACKET, rlp.out()).unwrap_or_else(|e| - debug!(target: "sync", "Error sending headers: {:?}", e)); - Ok(()) - } - - /// Dispatch incoming requests and responses - pub fn on_packet(&mut self, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) { - let rlp = UntrustedRlp::new(data); - let result = match packet_id { - STATUS_PACKET => self.on_peer_status(io, peer, &rlp), - TRANSACTIONS_PACKET => self.on_peer_transactions(io, peer, &rlp), - GET_BLOCK_HEADERS_PACKET => self.return_block_headers(io, &rlp), - BLOCK_HEADERS_PACKET => self.on_peer_block_headers(io, peer, &rlp), - GET_BLOCK_BODIES_PACKET => self.return_block_bodies(io, &rlp), - BLOCK_BODIES_PACKET => self.on_peer_block_bodies(io, peer, &rlp), - NEW_BLOCK_PACKET => self.on_peer_new_block(io, peer, &rlp), - NEW_BLOCK_HASHES_PACKET => self.on_peer_new_hashes(io, peer, &rlp), - GET_NODE_DATA_PACKET => self.return_node_data(io, &rlp), - GET_RECEIPTS_PACKET => self.return_receipts(io, &rlp), - _ => { - debug!(target: "sync", "Unknown packet {}", packet_id); - Ok(()) - } - }; - result.unwrap_or_else(|e| { - debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e); - }) - } - - /// Maintain other peers. Send out any new blocks and transactions - pub fn _maintain_sync(&mut self, _io: &mut SyncIo) { - } -} - diff --git a/sync/io.rs b/sync/io.rs deleted file mode 100644 index 501d36ad2..000000000 --- a/sync/io.rs +++ /dev/null @@ -1,62 +0,0 @@ -use client::BlockChainClient; -use util::{NetworkContext, PeerId, PacketId,}; -use util::error::UtilError; -use service::SyncMessage; - -/// IO interface for the syning handler. -/// Provides peer connection management and an interface to the blockchain client. -// TODO: ratings -pub trait SyncIo { - /// Disable a peer - fn disable_peer(&mut self, peer_id: PeerId); - /// Respond to current request with a packet. Can be called from an IO handler for incoming packet. - fn respond(&mut self, packet_id: PacketId, data: Vec) -> Result<(), UtilError>; - /// Send a packet to a peer. - fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), UtilError>; - /// Get the blockchain - fn chain(&self) -> &BlockChainClient; - /// Returns peer client identifier string - fn peer_info(&self, peer_id: PeerId) -> String { - peer_id.to_string() - } -} - -/// Wraps `NetworkContext` and the blockchain client -pub struct NetSyncIo<'s, 'h> where 'h: 's { - network: &'s NetworkContext<'h, SyncMessage>, - chain: &'s BlockChainClient -} - -impl<'s, 'h> NetSyncIo<'s, 'h> { - /// Creates a new instance from the `NetworkContext` and the blockchain client reference. - pub fn new(network: &'s NetworkContext<'h, SyncMessage>, chain: &'s BlockChainClient) -> NetSyncIo<'s, 'h> { - NetSyncIo { - network: network, - chain: chain, - } - } -} - -impl<'s, 'h> SyncIo for NetSyncIo<'s, 'h> { - fn disable_peer(&mut self, peer_id: PeerId) { - self.network.disable_peer(peer_id); - } - - fn respond(&mut self, packet_id: PacketId, data: Vec) -> Result<(), UtilError>{ - self.network.respond(packet_id, data) - } - - fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), UtilError>{ - self.network.send(peer_id, packet_id, data) - } - - fn chain(&self) -> &BlockChainClient { - self.chain - } - - fn peer_info(&self, peer_id: PeerId) -> String { - self.network.peer_info(peer_id) - } -} - - diff --git a/sync/mod.rs b/sync/mod.rs deleted file mode 100644 index 34a1e429d..000000000 --- a/sync/mod.rs +++ /dev/null @@ -1,94 +0,0 @@ -/// Blockchain sync module -/// Implements ethereum protocol version 63 as specified here: -/// https://github.com/ethereum/wiki/wiki/Ethereum-Wire-Protocol -/// -/// Usage example: -/// -/// ```rust -/// extern crate ethcore_util as util; -/// extern crate ethcore; -/// use std::env; -/// use std::sync::Arc; -/// use util::network::{NetworkService, NetworkConfiguration}; -/// use ethcore::client::Client; -/// use ethcore::sync::EthSync; -/// use ethcore::ethereum; -/// -/// fn main() { -/// let mut service = NetworkService::start(NetworkConfiguration::new()).unwrap(); -/// let dir = env::temp_dir(); -/// let client = Client::new(ethereum::new_frontier(), &dir, service.io().channel()).unwrap(); -/// EthSync::register(&mut service, client); -/// } -/// ``` - -use std::ops::*; -use std::sync::*; -use client::Client; -use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId}; -use sync::chain::ChainSync; -use service::SyncMessage; -use sync::io::NetSyncIo; - -mod chain; -mod io; -mod range_collection; - -#[cfg(test)] -mod tests; - -/// Ethereum network protocol handler -pub struct EthSync { - /// Shared blockchain client. TODO: this should evetually become an IPC endpoint - chain: Arc, - /// Sync strategy - sync: RwLock -} - -pub use self::chain::SyncStatus; - -impl EthSync { - /// Creates and register protocol with the network service - pub fn register(service: &mut NetworkService, chain: Arc) -> Arc { - let sync = Arc::new(EthSync { - chain: chain, - sync: RwLock::new(ChainSync::new()), - }); - service.register_protocol(sync.clone(), "eth", &[62u8, 63u8]).expect("Error registering eth protocol handler"); - sync - } - - /// Get sync status - pub fn status(&self) -> SyncStatus { - self.sync.read().unwrap().status() - } - - /// Stop sync - pub fn stop(&mut self, io: &mut NetworkContext) { - self.sync.write().unwrap().abort(&mut NetSyncIo::new(io, self.chain.deref())); - } - - /// Restart sync - pub fn restart(&mut self, io: &mut NetworkContext) { - self.sync.write().unwrap().restart(&mut NetSyncIo::new(io, self.chain.deref())); - } -} - -impl NetworkProtocolHandler for EthSync { - fn initialize(&self, _io: &NetworkContext) { - } - - fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { - self.sync.write().unwrap().on_packet(&mut NetSyncIo::new(io, self.chain.deref()) , *peer, packet_id, data); - } - - fn connected(&self, io: &NetworkContext, peer: &PeerId) { - self.sync.write().unwrap().on_peer_connected(&mut NetSyncIo::new(io, self.chain.deref()), *peer); - } - - fn disconnected(&self, io: &NetworkContext, peer: &PeerId) { - self.sync.write().unwrap().on_peer_aborting(&mut NetSyncIo::new(io, self.chain.deref()), *peer); - } -} - - diff --git a/sync/range_collection.rs b/sync/range_collection.rs deleted file mode 100644 index b8186e5a5..000000000 --- a/sync/range_collection.rs +++ /dev/null @@ -1,260 +0,0 @@ -/// This module defines a trait for a collection of ranged values and an implementation -/// for this trait over sorted vector. - -use std::ops::{Add, Sub, Range}; - -pub trait ToUsize { - fn to_usize(&self) -> usize; -} - -pub trait FromUsize { - fn from_usize(s: usize) -> Self; -} - -/// A key-value collection orderd by key with sequential key-value pairs grouped together. -/// Such group is called a range. -/// E.g. a set of collection of 5 pairs {1, a}, {2, b}, {10, x}, {11, y}, {12, z} will be grouped into two ranges: {1, [a,b]}, {10, [x,y,z]} -pub trait RangeCollection { - /// Check if the given key is present in the collection. - fn have_item(&self, key: &K) -> bool; - /// Get value by key. - fn find_item(&self, key: &K) -> Option<&V>; - /// Get a range of keys from `key` till the end of the range that has `key` - /// Returns an empty range is key does not exist. - fn get_tail(&mut self, key: &K) -> Range; - /// Remove all elements < `start` in the range that contains `start` - 1 - fn remove_head(&mut self, start: &K); - /// Remove all elements >= `start` in the range that contains `start` - fn remove_tail(&mut self, start: &K); - /// Remove all elements >= `tail` - fn insert_item(&mut self, key: K, value: V); - /// Get an iterator over ranges - fn range_iter(& self) -> RangeIterator; -} - -/// Range iterator. For each range yelds a key for the first element of the range and a vector of values. -pub struct RangeIterator<'c, K:'c, V:'c> { - range: usize, - collection: &'c Vec<(K, Vec)> -} - -impl<'c, K:'c, V:'c> Iterator for RangeIterator<'c, K, V> where K: Add + FromUsize + ToUsize + Copy { - type Item = (K, &'c [V]); - // The 'Iterator' trait only requires the 'next' method to be defined. The - // return type is 'Option', 'None' is returned when the 'Iterator' is - // over, otherwise the next value is returned wrapped in 'Some' - fn next(&mut self) -> Option<(K, &'c [V])> { - if self.range > 0 { - self.range -= 1; - } - else { - return None; - } - match self.collection.get(self.range) { - Some(&(ref k, ref vec)) => { - Some((*k, &vec)) - }, - None => None - } - } -} - -impl RangeCollection for Vec<(K, Vec)> where K: Ord + PartialEq + Add + Sub + Copy + FromUsize + ToUsize { - fn range_iter(&self) -> RangeIterator { - RangeIterator { - range: self.len(), - collection: self - } - } - - fn have_item(&self, key: &K) -> bool { - match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { - Ok(_) => true, - Err(index) => match self.get(index) { - Some(&(ref k, ref v)) => k <= key && (*k + FromUsize::from_usize(v.len())) > *key, - _ => false - }, - } - } - - fn find_item(&self, key: &K) -> Option<&V> { - match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { - Ok(index) => self.get(index).unwrap().1.get(0), - Err(index) => match self.get(index) { - Some(&(ref k, ref v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => v.get((*key - *k).to_usize()), - _ => None - }, - } - } - - fn get_tail(&mut self, key: &K) -> Range { - let kv = *key; - match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { - Ok(index) => kv..(kv + FromUsize::from_usize(self[index].1.len())), - Err(index) => { - match self.get_mut(index) { - Some(&mut (ref k, ref mut v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => { - kv..(*k + FromUsize::from_usize(v.len())) - } - _ => kv..kv - } - }, - } - } - /// Remove element key and following elements in the same range - fn remove_tail(&mut self, key: &K) { - match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { - Ok(index) => { self.remove(index); }, - Err(index) =>{ - let mut empty = false; - match self.get_mut(index) { - Some(&mut (ref k, ref mut v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => { - v.truncate((*key - *k).to_usize()); - empty = v.is_empty(); - } - _ => {} - } - if empty { - self.remove(index); - } - }, - } - } - - /// Remove range elements up to key - fn remove_head(&mut self, key: &K) { - if *key == FromUsize::from_usize(0) { - return - } - - let prev = *key - FromUsize::from_usize(1); - match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { - Ok(_) => { }, //start of range, do nothing. - Err(index) => { - let mut empty = false; - match self.get_mut(index) { - Some(&mut (ref mut k, ref mut v)) if *k <= prev && (*k + FromUsize::from_usize(v.len())) > prev => { - let tail = v.split_off((*key - *k).to_usize()); - empty = tail.is_empty(); - let removed = ::std::mem::replace(v, tail); - let new_k = *k + FromUsize::from_usize(removed.len()); - ::std::mem::replace(k, new_k); - } - _ => {} - } - if empty { - self.remove(index); - } - }, - } - } - - fn insert_item(&mut self, key: K, value: V) { - assert!(!self.have_item(&key)); - - let lower = match self.binary_search_by(|&(k, _)| k.cmp(&key).reverse()) { - Ok(index) => index, - Err(index) => index, - }; - - let mut to_remove: Option = None; - if lower < self.len() && self[lower].0 + FromUsize::from_usize(self[lower].1.len()) == key { - // extend into existing chunk - self[lower].1.push(value); - } - else { - // insert a new chunk - let range: Vec = vec![value]; - self.insert(lower, (key, range)); - }; - if lower > 0 { - let next = lower - 1; - if next < self.len() - { - { - let (mut next, mut inserted) = self.split_at_mut(lower); - let mut next = next.last_mut().unwrap(); - let mut inserted = inserted.first_mut().unwrap(); - if next.0 == key + FromUsize::from_usize(1) - { - inserted.1.append(&mut next.1); - to_remove = Some(lower - 1); - } - } - - if let Some(r) = to_remove { - self.remove(r); - } - } - } - } -} - -#[test] -#[allow(cyclomatic_complexity)] -fn test_range() { - use std::cmp::{Ordering}; - - let mut ranges: Vec<(u64, Vec)> = Vec::new(); - assert_eq!(ranges.range_iter().next(), None); - assert_eq!(ranges.find_item(&1), None); - assert!(!ranges.have_item(&1)); - assert_eq!(ranges.get_tail(&0), 0..0); - - ranges.insert_item(17, 'q'); - assert_eq!(ranges.range_iter().cmp(vec![(17, &['q'][..])]), Ordering::Equal); - assert_eq!(ranges.find_item(&17), Some(&'q')); - assert!(ranges.have_item(&17)); - assert_eq!(ranges.get_tail(&17), 17..18); - - ranges.insert_item(18, 'r'); - assert_eq!(ranges.range_iter().cmp(vec![(17, &['q', 'r'][..])]), Ordering::Equal); - assert_eq!(ranges.find_item(&18), Some(&'r')); - assert!(ranges.have_item(&18)); - assert_eq!(ranges.get_tail(&17), 17..19); - - ranges.insert_item(16, 'p'); - assert_eq!(ranges.range_iter().cmp(vec![(16, &['p', 'q', 'r'][..])]), Ordering::Equal); - assert_eq!(ranges.find_item(&16), Some(&'p')); - assert_eq!(ranges.find_item(&17), Some(&'q')); - assert_eq!(ranges.find_item(&18), Some(&'r')); - assert!(ranges.have_item(&16)); - assert_eq!(ranges.get_tail(&17), 17..19); - assert_eq!(ranges.get_tail(&16), 16..19); - - ranges.insert_item(2, 'b'); - assert_eq!(ranges.range_iter().cmp(vec![(2, &['b'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); - assert_eq!(ranges.find_item(&2), Some(&'b')); - - ranges.insert_item(3, 'c'); - ranges.insert_item(4, 'd'); - assert_eq!(ranges.get_tail(&3), 3..5); - assert_eq!(ranges.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); - - let mut r = ranges.clone(); - r.remove_head(&1); - assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); - r.remove_head(&2); - assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); - r.remove_head(&3); - assert_eq!(r.range_iter().cmp(vec![(3, &['c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); - r.remove_head(&10); - assert_eq!(r.range_iter().cmp(vec![(3, &['c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); - r.remove_head(&5); - assert_eq!(r.range_iter().cmp(vec![(16, &['p', 'q', 'r'][..])]), Ordering::Equal); - r.remove_head(&19); - assert_eq!(r.range_iter().next(), None); - - let mut r = ranges.clone(); - r.remove_tail(&20); - assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); - r.remove_tail(&17); - assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p'][..])]), Ordering::Equal); - r.remove_tail(&16); - assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..])]), Ordering::Equal); - r.remove_tail(&3); - assert_eq!(r.range_iter().cmp(vec![(2, &['b'][..])]), Ordering::Equal); - r.remove_tail(&2); - assert_eq!(r.range_iter().next(), None); -} - diff --git a/sync/tests.rs b/sync/tests.rs deleted file mode 100644 index eb09b467a..000000000 --- a/sync/tests.rs +++ /dev/null @@ -1,406 +0,0 @@ -use util::*; -use client::{BlockChainClient, BlockStatus, TreeRoute, BlockChainInfo}; -use block_queue::BlockQueueInfo; -use header::{Header as BlockHeader, BlockNumber}; -use error::*; -use sync::io::SyncIo; -use sync::chain::{ChainSync, SyncState}; - -struct TestBlockChainClient { - blocks: RwLock>, - numbers: RwLock>, - genesis_hash: H256, - last_hash: RwLock, - difficulty: RwLock, -} - -impl TestBlockChainClient { - fn new() -> TestBlockChainClient { - - let mut client = TestBlockChainClient { - blocks: RwLock::new(HashMap::new()), - numbers: RwLock::new(HashMap::new()), - genesis_hash: H256::new(), - last_hash: RwLock::new(H256::new()), - difficulty: RwLock::new(From::from(0)), - }; - client.add_blocks(1, true); // add genesis block - client.genesis_hash = client.last_hash.read().unwrap().clone(); - client - } - - pub fn add_blocks(&mut self, count: usize, empty: bool) { - let len = self.numbers.read().unwrap().len(); - for n in len..(len + count) { - let mut header = BlockHeader::new(); - header.difficulty = From::from(n); - header.parent_hash = self.last_hash.read().unwrap().clone(); - header.number = n as BlockNumber; - let mut uncles = RlpStream::new_list(if empty {0} else {1}); - if !empty { - uncles.append(&H256::from(&U256::from(n))); - header.uncles_hash = uncles.as_raw().sha3(); - } - let mut rlp = RlpStream::new_list(3); - rlp.append(&header); - rlp.append_raw(&rlp::NULL_RLP, 1); - rlp.append_raw(uncles.as_raw(), 1); - self.import_block(rlp.as_raw().to_vec()).unwrap(); - } - } -} - -impl BlockChainClient for TestBlockChainClient { - fn block_total_difficulty(&self, _h: &H256) -> Option { - unimplemented!(); - } - - fn block_header(&self, h: &H256) -> Option { - self.blocks.read().unwrap().get(h).map(|r| Rlp::new(r).at(0).as_raw().to_vec()) - - } - - fn block_body(&self, h: &H256) -> Option { - self.blocks.read().unwrap().get(h).map(|r| { - let mut stream = RlpStream::new_list(2); - stream.append_raw(Rlp::new(&r).at(1).as_raw(), 1); - stream.append_raw(Rlp::new(&r).at(2).as_raw(), 1); - stream.out() - }) - } - - fn block(&self, h: &H256) -> Option { - self.blocks.read().unwrap().get(h).cloned() - } - - fn block_status(&self, h: &H256) -> BlockStatus { - match self.blocks.read().unwrap().get(h) { - Some(_) => BlockStatus::InChain, - None => BlockStatus::Unknown - } - } - - fn block_total_difficulty_at(&self, _number: BlockNumber) -> Option { - unimplemented!(); - } - - fn block_header_at(&self, n: BlockNumber) -> Option { - self.numbers.read().unwrap().get(&(n as usize)).and_then(|h| self.block_header(h)) - } - - fn block_body_at(&self, n: BlockNumber) -> Option { - self.numbers.read().unwrap().get(&(n as usize)).and_then(|h| self.block_body(h)) - } - - fn block_at(&self, n: BlockNumber) -> Option { - self.numbers.read().unwrap().get(&(n as usize)).map(|h| self.blocks.read().unwrap().get(h).unwrap().clone()) - } - - fn block_status_at(&self, n: BlockNumber) -> BlockStatus { - if (n as usize) < self.blocks.read().unwrap().len() { - BlockStatus::InChain - } else { - BlockStatus::Unknown - } - } - - fn tree_route(&self, _from: &H256, _to: &H256) -> Option { - Some(TreeRoute { - blocks: Vec::new(), - ancestor: H256::new(), - index: 0 - }) - } - - fn state_data(&self, _h: &H256) -> Option { - None - } - - fn block_receipts(&self, _h: &H256) -> Option { - None - } - - fn import_block(&self, b: Bytes) -> ImportResult { - let header = Rlp::new(&b).val_at::(0); - let h = header.hash(); - let number: usize = header.number as usize; - if number > self.blocks.read().unwrap().len() { - panic!("Unexpected block number. Expected {}, got {}", self.blocks.read().unwrap().len(), number); - } - if number > 0 { - match self.blocks.read().unwrap().get(&header.parent_hash) { - Some(parent) => { - let parent = Rlp::new(parent).val_at::(0); - if parent.number != (header.number - 1) { - panic!("Unexpected block parent"); - } - }, - None => { - panic!("Unknown block parent {:?} for block {}", header.parent_hash, number); - } - } - } - let len = self.numbers.read().unwrap().len(); - if number == len { - *self.difficulty.write().unwrap().deref_mut() += header.difficulty; - mem::replace(self.last_hash.write().unwrap().deref_mut(), h.clone()); - self.blocks.write().unwrap().insert(h.clone(), b); - self.numbers.write().unwrap().insert(number, h.clone()); - let mut parent_hash = header.parent_hash; - if number > 0 { - let mut n = number - 1; - while n > 0 && self.numbers.read().unwrap()[&n] != parent_hash { - *self.numbers.write().unwrap().get_mut(&n).unwrap() = parent_hash.clone(); - n -= 1; - parent_hash = Rlp::new(&self.blocks.read().unwrap()[&parent_hash]).val_at::(0).parent_hash; - } - } - } - else { - self.blocks.write().unwrap().insert(h.clone(), b.to_vec()); - } - Ok(h) - } - - fn queue_info(&self) -> BlockQueueInfo { - BlockQueueInfo { - full: false, - verified_queue_size: 0, - unverified_queue_size: 0, - verifying_queue_size: 0, - } - } - - fn clear_queue(&self) { - } - - fn chain_info(&self) -> BlockChainInfo { - BlockChainInfo { - total_difficulty: *self.difficulty.read().unwrap(), - pending_total_difficulty: *self.difficulty.read().unwrap(), - genesis_hash: self.genesis_hash.clone(), - best_block_hash: self.last_hash.read().unwrap().clone(), - best_block_number: self.blocks.read().unwrap().len() as BlockNumber - 1, - } - } -} - -struct TestIo<'p> { - chain: &'p mut TestBlockChainClient, - queue: &'p mut VecDeque, - sender: Option, -} - -impl<'p> TestIo<'p> { - fn new(chain: &'p mut TestBlockChainClient, queue: &'p mut VecDeque, sender: Option) -> TestIo<'p> { - TestIo { - chain: chain, - queue: queue, - sender: sender - } - } -} - -impl<'p> SyncIo for TestIo<'p> { - fn disable_peer(&mut self, _peer_id: PeerId) { - } - - fn respond(&mut self, packet_id: PacketId, data: Vec) -> Result<(), UtilError> { - self.queue.push_back(TestPacket { - data: data, - packet_id: packet_id, - recipient: self.sender.unwrap() - }); - Ok(()) - } - - fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), UtilError> { - self.queue.push_back(TestPacket { - data: data, - packet_id: packet_id, - recipient: peer_id, - }); - Ok(()) - } - - fn chain(&self) -> &BlockChainClient { - self.chain - } -} - -struct TestPacket { - data: Bytes, - packet_id: PacketId, - recipient: PeerId, -} - -struct TestPeer { - chain: TestBlockChainClient, - sync: ChainSync, - queue: VecDeque, -} - -struct TestNet { - peers: Vec, - started: bool -} - -impl TestNet { - pub fn new(n: usize) -> TestNet { - let mut net = TestNet { - peers: Vec::new(), - started: false - }; - for _ in 0..n { - net.peers.push(TestPeer { - chain: TestBlockChainClient::new(), - sync: ChainSync::new(), - queue: VecDeque::new(), - }); - } - net - } - - pub fn peer(&self, i: usize) -> &TestPeer { - self.peers.get(i).unwrap() - } - - pub fn peer_mut(&mut self, i: usize) -> &mut TestPeer { - self.peers.get_mut(i).unwrap() - } - - pub fn start(&mut self) { - for peer in 0..self.peers.len() { - for client in 0..self.peers.len() { - if peer != client { - let mut p = self.peers.get_mut(peer).unwrap(); - p.sync.on_peer_connected(&mut TestIo::new(&mut p.chain, &mut p.queue, Some(client as PeerId)), client as PeerId); - } - } - } - } - - pub fn sync_step(&mut self) { - for peer in 0..self.peers.len() { - if let Some(packet) = self.peers[peer].queue.pop_front() { - let mut p = self.peers.get_mut(packet.recipient).unwrap(); - trace!("--- {} -> {} ---", peer, packet.recipient); - p.sync.on_packet(&mut TestIo::new(&mut p.chain, &mut p.queue, Some(peer as PeerId)), peer as PeerId, packet.packet_id, &packet.data); - trace!("----------------"); - } - let mut p = self.peers.get_mut(peer).unwrap(); - p.sync._maintain_sync(&mut TestIo::new(&mut p.chain, &mut p.queue, None)); - } - } - - pub fn restart_peer(&mut self, i: usize) { - let peer = self.peer_mut(i); - peer.sync.restart(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None)); - } - - pub fn sync(&mut self) -> u32 { - self.start(); - let mut total_steps = 0; - while !self.done() { - self.sync_step(); - total_steps = total_steps + 1; - } - total_steps - } - - pub fn sync_steps(&mut self, count: usize) { - if !self.started { - self.start(); - self.started = true; - } - for _ in 0..count { - self.sync_step(); - } - } - - pub fn done(&self) -> bool { - self.peers.iter().all(|p| p.queue.is_empty()) - } -} - -#[test] -fn chain_two_peers() { - let mut net = TestNet::new(3); - net.peer_mut(1).chain.add_blocks(1000, false); - net.peer_mut(2).chain.add_blocks(1000, false); - net.sync(); - assert!(net.peer(0).chain.block_at(1000).is_some()); - assert_eq!(net.peer(0).chain.blocks.read().unwrap().deref(), net.peer(1).chain.blocks.read().unwrap().deref()); -} - -#[test] -fn chain_status_after_sync() { - let mut net = TestNet::new(3); - net.peer_mut(1).chain.add_blocks(1000, false); - net.peer_mut(2).chain.add_blocks(1000, false); - net.sync(); - let status = net.peer(0).sync.status(); - assert_eq!(status.state, SyncState::Idle); -} - -#[test] -fn chain_takes_few_steps() { - let mut net = TestNet::new(3); - net.peer_mut(1).chain.add_blocks(100, false); - net.peer_mut(2).chain.add_blocks(100, false); - let total_steps = net.sync(); - assert!(total_steps < 7); -} - -#[test] -fn chain_empty_blocks() { - let mut net = TestNet::new(3); - for n in 0..200 { - net.peer_mut(1).chain.add_blocks(5, n % 2 == 0); - net.peer_mut(2).chain.add_blocks(5, n % 2 == 0); - } - net.sync(); - assert!(net.peer(0).chain.block_at(1000).is_some()); - assert_eq!(net.peer(0).chain.blocks.read().unwrap().deref(), net.peer(1).chain.blocks.read().unwrap().deref()); -} - -#[test] -fn chain_forked() { - let mut net = TestNet::new(3); - net.peer_mut(0).chain.add_blocks(300, false); - net.peer_mut(1).chain.add_blocks(300, false); - net.peer_mut(2).chain.add_blocks(300, false); - net.peer_mut(0).chain.add_blocks(100, true); //fork - net.peer_mut(1).chain.add_blocks(200, false); - net.peer_mut(2).chain.add_blocks(200, false); - net.peer_mut(1).chain.add_blocks(100, false); //fork between 1 and 2 - net.peer_mut(2).chain.add_blocks(10, true); - // peer 1 has the best chain of 601 blocks - let peer1_chain = net.peer(1).chain.numbers.read().unwrap().clone(); - net.sync(); - assert_eq!(net.peer(0).chain.numbers.read().unwrap().deref(), &peer1_chain); - assert_eq!(net.peer(1).chain.numbers.read().unwrap().deref(), &peer1_chain); - assert_eq!(net.peer(2).chain.numbers.read().unwrap().deref(), &peer1_chain); -} - -#[test] -fn chain_restart() { - let mut net = TestNet::new(3); - net.peer_mut(1).chain.add_blocks(1000, false); - net.peer_mut(2).chain.add_blocks(1000, false); - - net.sync_steps(8); - - // make sure that sync has actually happened - assert!(net.peer(0).chain.chain_info().best_block_number > 100); - net.restart_peer(0); - - let status = net.peer(0).sync.status(); - assert_eq!(status.state, SyncState::NotSynced); -} - -#[test] -fn chain_status_empty() { - let net = TestNet::new(2); - assert_eq!(net.peer(0).sync.status().state, SyncState::NotSynced); -}