From 1e91b6f5dbb6c06814195f5ba891c25f05a2ad24 Mon Sep 17 00:00:00 2001 From: arkpar Date: Fri, 29 Jan 2016 15:56:06 +0100 Subject: [PATCH 1/3] Moved sync to a separate crate --- bin/Cargo.toml | 1 + bin/src/main.rs | 9 +- src/lib.rs | 2 - src/service.rs | 18 +- sync/Cargo.toml | 16 + {src/sync => sync}/chain.rs | 0 {src/sync => sync}/io.rs | 0 {src/sync => sync}/mod.rs | 0 {src/sync => sync}/range_collection.rs | 0 sync/src/chain.rs | 969 +++++++++++++++++++++++++ sync/src/io.rs | 62 ++ sync/src/lib.rs | 106 +++ sync/src/range_collection.rs | 260 +++++++ sync/src/service.rs | 104 +++ sync/src/tests.rs | 349 +++++++++ {src/sync => sync}/tests.rs | 0 16 files changed, 1880 insertions(+), 16 deletions(-) create mode 100644 sync/Cargo.toml rename {src/sync => sync}/chain.rs (100%) rename {src/sync => sync}/io.rs (100%) rename {src/sync => sync}/mod.rs (100%) rename {src/sync => sync}/range_collection.rs (100%) create mode 100644 sync/src/chain.rs create mode 100644 sync/src/io.rs create mode 100644 sync/src/lib.rs create mode 100644 sync/src/range_collection.rs create mode 100644 sync/src/service.rs create mode 100644 sync/src/tests.rs rename {src/sync => sync}/tests.rs (100%) diff --git a/bin/Cargo.toml b/bin/Cargo.toml index 7174ada14..8fc233796 100644 --- a/bin/Cargo.toml +++ b/bin/Cargo.toml @@ -15,6 +15,7 @@ ctrlc = "1.0" ethcore-util = { path = "../util" } ethcore-rpc = { path = "../rpc", optional = true } ethcore = { path = ".." } +ethsync = { path = "../sync" } clippy = "0.0.37" [features] diff --git a/bin/src/main.rs b/bin/src/main.rs index 3d4199fcf..92f0cbf20 100644 --- a/bin/src/main.rs +++ b/bin/src/main.rs @@ -8,6 +8,7 @@ extern crate docopt; extern crate rustc_serialize; extern crate ethcore_util as util; extern crate ethcore; +extern crate ethsync; extern crate log; extern crate env_logger; extern crate ctrlc; @@ -24,7 +25,7 @@ use ethcore::client::*; use ethcore::service::{ClientService, NetSyncMessage}; use ethcore::ethereum; use ethcore::blockchain::CacheSize; -use ethcore::sync::EthSync; +use ethsync::EthSync; docopt!(Args derive Debug, " Parity. Ethereum Client. @@ -81,8 +82,10 @@ fn main() { let mut net_settings = NetworkConfiguration::new(); net_settings.boot_nodes = init_nodes; let mut service = ClientService::start(spec, net_settings).unwrap(); - setup_rpc_server(service.client(), service.sync()); - let io_handler = Arc::new(ClientIoHandler { client: service.client(), info: Default::default(), sync: service.sync() }); + let client = service.client().clone(); + let sync = EthSync::register(service.network(), client); + setup_rpc_server(service.client(), sync.clone()); + let io_handler = Arc::new(ClientIoHandler { client: service.client(), info: Default::default(), sync: sync }); service.io().register_handler(io_handler).expect("Error registering IO handler"); let exit = Arc::new(Condvar::new()); diff --git a/src/lib.rs b/src/lib.rs index e084635dd..0652d964e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -151,8 +151,6 @@ mod tests; /// TODO [arkpar] Please document me pub mod client; /// TODO [arkpar] Please document me -pub mod sync; -/// TODO [arkpar] Please document me pub mod block; /// TODO [arkpar] Please document me pub mod verification; diff --git a/src/service.rs b/src/service.rs index 8c900d20a..a56ed7f44 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,5 +1,4 @@ use util::*; -use sync::*; use spec::Spec; use error::*; use std::env; @@ -21,7 +20,6 @@ pub type NetSyncMessage = NetworkIoMessage; pub struct ClientService { net_service: NetworkService, client: Arc, - sync: Arc, } impl ClientService { @@ -34,7 +32,6 @@ impl ClientService { dir.push(".parity"); dir.push(H64::from(spec.genesis_header().hash()).hex()); let client = try!(Client::new(spec, &dir, net_service.io().channel())); - let sync = EthSync::register(&mut net_service, client.clone()); let client_io = Arc::new(ClientIoHandler { client: client.clone() }); @@ -43,29 +40,28 @@ impl ClientService { Ok(ClientService { net_service: net_service, client: client, - sync: sync, }) } - /// Get the network service. + /// Add a node to network pub fn add_node(&mut self, _enode: &str) { unimplemented!(); } - /// TODO [arkpar] Please document me + /// Get general IO interface pub fn io(&mut self) -> &mut IoService { self.net_service.io() } - /// TODO [arkpar] Please document me + /// Get client interface pub fn client(&self) -> Arc { self.client.clone() } - - /// Get shared sync handler - pub fn sync(&self) -> Arc { - self.sync.clone() + + /// Get network service component + pub fn network(&mut self) -> &mut NetworkService { + &mut self.net_service } } diff --git a/sync/Cargo.toml b/sync/Cargo.toml new file mode 100644 index 000000000..c3ae470fd --- /dev/null +++ b/sync/Cargo.toml @@ -0,0 +1,16 @@ +[package] +description = "Ethcore blockchain sync" +name = "ethsync" +version = "0.1.0" +license = "GPL-3.0" +authors = ["Ethcore usize { + *self as usize + } +} + +impl FromUsize for BlockNumber { + fn from_usize(s: usize) -> BlockNumber { + s as BlockNumber + } +} + +type PacketDecodeError = DecoderError; + +const PROTOCOL_VERSION: u8 = 63u8; +const MAX_BODIES_TO_SEND: usize = 256; +const MAX_HEADERS_TO_SEND: usize = 512; +const MAX_NODE_DATA_TO_SEND: usize = 1024; +const MAX_RECEIPTS_TO_SEND: usize = 1024; +const MAX_HEADERS_TO_REQUEST: usize = 512; +const MAX_BODIES_TO_REQUEST: usize = 256; + +const STATUS_PACKET: u8 = 0x00; +const NEW_BLOCK_HASHES_PACKET: u8 = 0x01; +const TRANSACTIONS_PACKET: u8 = 0x02; +const GET_BLOCK_HEADERS_PACKET: u8 = 0x03; +const BLOCK_HEADERS_PACKET: u8 = 0x04; +const GET_BLOCK_BODIES_PACKET: u8 = 0x05; +const BLOCK_BODIES_PACKET: u8 = 0x06; +const NEW_BLOCK_PACKET: u8 = 0x07; + +const GET_NODE_DATA_PACKET: u8 = 0x0d; +const NODE_DATA_PACKET: u8 = 0x0e; +const GET_RECEIPTS_PACKET: u8 = 0x0f; +const RECEIPTS_PACKET: u8 = 0x10; + +const NETWORK_ID: U256 = ONE_U256; //TODO: get this from parent + +struct Header { + /// Header data + data: Bytes, + /// Block hash + hash: H256, + /// Parent hash + parent: H256, +} + +/// Used to identify header by transactions and uncles hashes +#[derive(Eq, PartialEq, Hash)] +struct HeaderId { + transactions_root: H256, + uncles: H256 +} + +#[derive(Copy, Clone, Eq, PartialEq, Debug)] +/// Sync state +pub enum SyncState { + /// Initial chain sync has not started yet + NotSynced, + /// Initial chain sync complete. Waiting for new packets + Idle, + /// Block downloading paused. Waiting for block queue to process blocks and free some space + Waiting, + /// Downloading blocks + Blocks, + /// Downloading blocks learned from NewHashes packet + NewBlocks, +} + +/// Syncing status and statistics +pub struct SyncStatus { + /// State + pub state: SyncState, + /// Syncing protocol version. That's the maximum protocol version we connect to. + pub protocol_version: u8, + /// BlockChain height for the moment the sync started. + pub start_block_number: BlockNumber, + /// Last fully downloaded and imported block number. + pub last_imported_block_number: BlockNumber, + /// Highest block number in the download queue. + pub highest_block_number: BlockNumber, + /// Total number of blocks for the sync process. + pub blocks_total: usize, + /// Number of blocks downloaded so far. + pub blocks_received: usize, + /// Total number of connected peers + pub num_peers: usize, + /// Total number of active peers + pub num_active_peers: usize, +} + +#[derive(PartialEq, Eq, Debug)] +/// Peer data type requested +enum PeerAsking { + Nothing, + BlockHeaders, + BlockBodies, +} + +/// Syncing peer information +struct PeerInfo { + /// eth protocol version + protocol_version: u32, + /// Peer chain genesis hash + genesis: H256, + /// Peer network id + network_id: U256, + /// Peer best block hash + latest: H256, + /// Peer total difficulty + difficulty: U256, + /// Type of data currenty being requested from peer. + asking: PeerAsking, + /// A set of block numbers being requested + asking_blocks: Vec, +} + +/// Blockchain sync handler. +/// See module documentation for more details. +pub struct ChainSync { + /// Sync state + state: SyncState, + /// Last block number for the start of sync + starting_block: BlockNumber, + /// Highest block number seen + highest_block: BlockNumber, + /// Set of block header numbers being downloaded + downloading_headers: HashSet, + /// Set of block body numbers being downloaded + downloading_bodies: HashSet, + /// Downloaded headers. + headers: Vec<(BlockNumber, Vec
)>, //TODO: use BTreeMap once range API is sable. For now it is a vector sorted in descending order + /// Downloaded bodies + bodies: Vec<(BlockNumber, Vec)>, //TODO: use BTreeMap once range API is sable. For now it is a vector sorted in descending order + /// Peer info + peers: HashMap, + /// Used to map body to header + header_ids: HashMap, + /// Last impoted block number + last_imported_block: BlockNumber, + /// Last impoted block hash + last_imported_hash: H256, + /// Syncing total difficulty + syncing_difficulty: U256, + /// True if common block for our and remote chain has been found + have_common_block: bool, +} + + +impl ChainSync { + /// Create a new instance of syncing strategy. + pub fn new() -> ChainSync { + ChainSync { + state: SyncState::NotSynced, + starting_block: 0, + highest_block: 0, + downloading_headers: HashSet::new(), + downloading_bodies: HashSet::new(), + headers: Vec::new(), + bodies: Vec::new(), + peers: HashMap::new(), + header_ids: HashMap::new(), + last_imported_block: 0, + last_imported_hash: H256::new(), + syncing_difficulty: U256::from(0u64), + have_common_block: false, + } + } + + /// @returns Synchonization status + pub fn status(&self) -> SyncStatus { + SyncStatus { + state: self.state.clone(), + protocol_version: 63, + start_block_number: self.starting_block, + last_imported_block_number: self.last_imported_block, + highest_block_number: self.highest_block, + blocks_received: (self.last_imported_block - self.starting_block) as usize, + blocks_total: (self.highest_block - self.starting_block) as usize, + num_peers: self.peers.len(), + num_active_peers: self.peers.values().filter(|p| p.asking != PeerAsking::Nothing).count(), + } + } + + /// Abort all sync activity + pub fn abort(&mut self, io: &mut SyncIo) { + self.restart(io); + self.peers.clear(); + } + + /// Rest sync. Clear all downloaded data but keep the queue + fn reset(&mut self) { + self.downloading_headers.clear(); + self.downloading_bodies.clear(); + self.headers.clear(); + self.bodies.clear(); + for (_, ref mut p) in &mut self.peers { + p.asking_blocks.clear(); + } + self.header_ids.clear(); + self.syncing_difficulty = From::from(0u64); + self.state = SyncState::Idle; + } + + /// Restart sync + pub fn restart(&mut self, io: &mut SyncIo) { + self.reset(); + self.last_imported_block = 0; + self.last_imported_hash = H256::new(); + self.starting_block = 0; + self.highest_block = 0; + self.have_common_block = false; + io.chain().clear_queue(); + self.starting_block = io.chain().chain_info().best_block_number; + self.state = SyncState::NotSynced; + } + + /// Called by peer to report status + fn on_peer_status(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + let peer = PeerInfo { + protocol_version: try!(r.val_at(0)), + network_id: try!(r.val_at(1)), + difficulty: try!(r.val_at(2)), + latest: try!(r.val_at(3)), + genesis: try!(r.val_at(4)), + asking: PeerAsking::Nothing, + asking_blocks: Vec::new(), + }; + + trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest, peer.genesis); + + let chain_info = io.chain().chain_info(); + if peer.genesis != chain_info.genesis_hash { + io.disable_peer(peer_id); + trace!(target: "sync", "Peer {} genesis hash not matched", peer_id); + return Ok(()); + } + if peer.network_id != NETWORK_ID { + io.disable_peer(peer_id); + trace!(target: "sync", "Peer {} network id not matched", peer_id); + return Ok(()); + } + + let old = self.peers.insert(peer_id.clone(), peer); + if old.is_some() { + panic!("ChainSync: new peer already exists"); + } + info!(target: "sync", "Connected {}:{}", peer_id, io.peer_info(peer_id)); + self.sync_peer(io, peer_id, false); + Ok(()) + } + + #[allow(cyclomatic_complexity)] + /// Called by peer once it has new block headers during sync + fn on_peer_block_headers(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + self.reset_peer_asking(peer_id, PeerAsking::BlockHeaders); + let item_count = r.item_count(); + trace!(target: "sync", "{} -> BlockHeaders ({} entries)", peer_id, item_count); + self.clear_peer_download(peer_id); + if self.state != SyncState::Blocks && self.state != SyncState::NewBlocks && self.state != SyncState::Waiting { + trace!(target: "sync", "Ignored unexpected block headers"); + return Ok(()); + } + if self.state == SyncState::Waiting { + trace!(target: "sync", "Ignored block headers while waiting"); + return Ok(()); + } + + for i in 0..item_count { + let info: BlockHeader = try!(r.val_at(i)); + let number = BlockNumber::from(info.number); + if number <= self.last_imported_block || self.headers.have_item(&number) { + trace!(target: "sync", "Skipping existing block header"); + continue; + } + if number > self.highest_block { + self.highest_block = number; + } + let hash = info.hash(); + match io.chain().block_status(&hash) { + BlockStatus::InChain => { + self.have_common_block = true; + self.last_imported_block = number; + self.last_imported_hash = hash.clone(); + trace!(target: "sync", "Found common header {} ({})", number, hash); + }, + _ => { + if self.have_common_block { + //validate chain + if self.have_common_block && number == self.last_imported_block + 1 && info.parent_hash != self.last_imported_hash { + // TODO: lower peer rating + debug!(target: "sync", "Mismatched block header {} {}", number, hash); + continue; + } + if self.headers.find_item(&(number - 1)).map_or(false, |p| p.hash != info.parent_hash) { + // mismatching parent id, delete the previous block and don't add this one + // TODO: lower peer rating + debug!(target: "sync", "Mismatched block header {} {}", number, hash); + self.remove_downloaded_blocks(number - 1); + continue; + } + if self.headers.find_item(&(number + 1)).map_or(false, |p| p.parent != hash) { + // mismatching parent id for the next block, clear following headers + debug!(target: "sync", "Mismatched block header {}", number + 1); + self.remove_downloaded_blocks(number + 1); + } + } + let hdr = Header { + data: try!(r.at(i)).as_raw().to_vec(), + hash: hash.clone(), + parent: info.parent_hash, + }; + self.headers.insert_item(number, hdr); + let header_id = HeaderId { + transactions_root: info.transactions_root, + uncles: info.uncles_hash + }; + trace!(target: "sync", "Got header {} ({})", number, hash); + if header_id.transactions_root == rlp::SHA3_NULL_RLP && header_id.uncles == rlp::SHA3_EMPTY_LIST_RLP { + //empty body, just mark as downloaded + let mut body_stream = RlpStream::new_list(2); + body_stream.append_raw(&rlp::NULL_RLP, 1); + body_stream.append_raw(&rlp::EMPTY_LIST_RLP, 1); + self.bodies.insert_item(number, body_stream.out()); + } + else { + self.header_ids.insert(header_id, number); + } + } + } + } + self.collect_blocks(io); + self.continue_sync(io); + Ok(()) + } + + /// Called by peer once it has new block bodies + fn on_peer_block_bodies(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + use util::triehash::ordered_trie_root; + self.reset_peer_asking(peer_id, PeerAsking::BlockBodies); + let item_count = r.item_count(); + trace!(target: "sync", "{} -> BlockBodies ({} entries)", peer_id, item_count); + self.clear_peer_download(peer_id); + if self.state != SyncState::Blocks && self.state != SyncState::NewBlocks && self.state != SyncState::Waiting { + trace!(target: "sync", "Ignored unexpected block bodies"); + return Ok(()); + } + if self.state == SyncState::Waiting { + trace!(target: "sync", "Ignored block bodies while waiting"); + return Ok(()); + } + for i in 0..item_count { + let body = try!(r.at(i)); + let tx = try!(body.at(0)); + let tx_root = ordered_trie_root(tx.iter().map(|r| r.as_raw().to_vec()).collect()); //TODO: get rid of vectors here + let uncles = try!(body.at(1)).as_raw().sha3(); + let header_id = HeaderId { + transactions_root: tx_root, + uncles: uncles + }; + match self.header_ids.get(&header_id).cloned() { + Some(n) => { + self.header_ids.remove(&header_id); + self.bodies.insert_item(n, body.as_raw().to_vec()); + trace!(target: "sync", "Got body {}", n); + } + None => { + debug!(target: "sync", "Ignored unknown block body"); + } + } + } + self.collect_blocks(io); + self.continue_sync(io); + Ok(()) + } + + /// Called by peer once it has new block bodies + fn on_peer_new_block(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + let block_rlp = try!(r.at(0)); + let header_rlp = try!(block_rlp.at(0)); + let h = header_rlp.as_raw().sha3(); + + trace!(target: "sync", "{} -> NewBlock ({})", peer_id, h); + let header_view = HeaderView::new(header_rlp.as_raw()); + // TODO: Decompose block and add to self.headers and self.bodies instead + if header_view.number() == From::from(self.last_imported_block + 1) { + match io.chain().import_block(block_rlp.as_raw().to_vec()) { + Err(ImportError::AlreadyInChain) => { + trace!(target: "sync", "New block already in chain {:?}", h); + }, + Err(ImportError::AlreadyQueued) => { + trace!(target: "sync", "New block already queued {:?}", h); + }, + Ok(_) => { + trace!(target: "sync", "New block queued {:?}", h); + }, + Err(e) => { + debug!(target: "sync", "Bad new block {:?} : {:?}", h, e); + io.disable_peer(peer_id); + } + }; + } + else { + trace!(target: "sync", "New block unknown {:?}", h); + //TODO: handle too many unknown blocks + let difficulty: U256 = try!(r.val_at(1)); + let peer_difficulty = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").difficulty; + if difficulty > peer_difficulty { + trace!(target: "sync", "Received block {:?} with no known parent. Peer needs syncing...", h); + { + let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + peer.latest = header_view.sha3(); + } + self.sync_peer(io, peer_id, true); + } + } + Ok(()) + } + + /// Handles NewHashes packet. Initiates headers download for any unknown hashes. + fn on_peer_new_hashes(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + if self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").asking != PeerAsking::Nothing { + trace!(target: "sync", "Ignoring new hashes since we're already downloading."); + return Ok(()); + } + trace!(target: "sync", "{} -> NewHashes ({} entries)", peer_id, r.item_count()); + let hashes = r.iter().map(|item| (item.val_at::(0), item.val_at::(1))); + let mut max_height: U256 = From::from(0); + for (rh, rd) in hashes { + let h = try!(rh); + let d = try!(rd); + match io.chain().block_status(&h) { + BlockStatus::InChain => { + trace!(target: "sync", "New block hash already in chain {:?}", h); + }, + BlockStatus::Queued => { + trace!(target: "sync", "New hash block already queued {:?}", h); + }, + BlockStatus::Unknown => { + trace!(target: "sync", "New unknown block hash {:?}", h); + if d > max_height { + let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + peer.latest = h.clone(); + max_height = d; + } + }, + BlockStatus::Bad =>{ + debug!(target: "sync", "Bad new block hash {:?}", h); + io.disable_peer(peer_id); + return Ok(()); + } + } + }; + if max_height != x!(0) { + self.sync_peer(io, peer_id, true); + } + Ok(()) + } + + /// Called by peer when it is disconnecting + pub fn on_peer_aborting(&mut self, io: &mut SyncIo, peer: PeerId) { + trace!(target: "sync", "== Disconnecting {}", peer); + if self.peers.contains_key(&peer) { + info!(target: "sync", "Disconnected {}:{}", peer, io.peer_info(peer)); + self.clear_peer_download(peer); + self.peers.remove(&peer); + self.continue_sync(io); + } + } + + /// Called when a new peer is connected + pub fn on_peer_connected(&mut self, io: &mut SyncIo, peer: PeerId) { + trace!(target: "sync", "== Connected {}", peer); + self.send_status(io, peer); + } + + /// Resume downloading + fn continue_sync(&mut self, io: &mut SyncIo) { + let mut peers: Vec<(PeerId, U256)> = self.peers.iter().map(|(k, p)| (*k, p.difficulty)).collect(); + peers.sort_by(|&(_, d1), &(_, d2)| d1.cmp(&d2).reverse()); //TODO: sort by rating + for (p, _) in peers { + self.sync_peer(io, p, false); + } + } + + /// Called after all blocks have been donloaded + fn complete_sync(&mut self) { + trace!(target: "sync", "Sync complete"); + self.reset(); + self.state = SyncState::Idle; + } + + /// Enter waiting state + fn pause_sync(&mut self) { + trace!(target: "sync", "Block queue full, pausing sync"); + self.state = SyncState::Waiting; + } + + /// Find something to do for a peer. Called for a new peer or when a peer is done with it's task. + fn sync_peer(&mut self, io: &mut SyncIo, peer_id: PeerId, force: bool) { + let (peer_latest, peer_difficulty) = { + let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + if peer.asking != PeerAsking::Nothing { + return; + } + if self.state == SyncState::Waiting { + trace!(target: "sync", "Waiting for block queue"); + return; + } + (peer.latest.clone(), peer.difficulty.clone()) + }; + + let td = io.chain().chain_info().pending_total_difficulty; + let syncing_difficulty = max(self.syncing_difficulty, td); + if force || peer_difficulty > syncing_difficulty { + // start sync + self.syncing_difficulty = peer_difficulty; + if self.state == SyncState::Idle || self.state == SyncState::NotSynced { + self.state = SyncState::Blocks; + } + trace!(target: "sync", "Starting sync with better chain"); + self.request_headers_by_hash(io, peer_id, &peer_latest, 1, 0, false); + } + else if self.state == SyncState::Blocks { + self.request_blocks(io, peer_id); + } + } + + /// Find some headers or blocks to download for a peer. + fn request_blocks(&mut self, io: &mut SyncIo, peer_id: PeerId) { + self.clear_peer_download(peer_id); + + if io.chain().queue_info().full { + self.pause_sync(); + return; + } + + // check to see if we need to download any block bodies first + let mut needed_bodies: Vec = Vec::new(); + let mut needed_numbers: Vec = Vec::new(); + + if self.have_common_block && !self.headers.is_empty() && self.headers.range_iter().next().unwrap().0 == self.last_imported_block + 1 { + for (start, ref items) in self.headers.range_iter() { + if needed_bodies.len() > MAX_BODIES_TO_REQUEST { + break; + } + let mut index: BlockNumber = 0; + while index != items.len() as BlockNumber && needed_bodies.len() < MAX_BODIES_TO_REQUEST { + let block = start + index; + if !self.downloading_bodies.contains(&block) && !self.bodies.have_item(&block) { + needed_bodies.push(items[index as usize].hash.clone()); + needed_numbers.push(block); + self.downloading_bodies.insert(block); + } + index += 1; + } + } + } + if !needed_bodies.is_empty() { + replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, needed_numbers); + self.request_bodies(io, peer_id, needed_bodies); + } + else { + // check if need to download headers + let mut start = 0usize; + if !self.have_common_block { + // download backwards until common block is found 1 header at a time + let chain_info = io.chain().chain_info(); + start = chain_info.best_block_number as usize; + if !self.headers.is_empty() { + start = min(start, self.headers.range_iter().next().unwrap().0 as usize - 1); + } + if start == 0 { + self.have_common_block = true; //reached genesis + self.last_imported_hash = chain_info.genesis_hash; + } + } + if self.have_common_block { + let mut headers: Vec = Vec::new(); + let mut prev = self.last_imported_block + 1; + for (next, ref items) in self.headers.range_iter() { + if !headers.is_empty() { + break; + } + if next <= prev { + prev = next + items.len() as BlockNumber; + continue; + } + let mut block = prev; + while block < next && headers.len() <= MAX_HEADERS_TO_REQUEST { + if !self.downloading_headers.contains(&(block as BlockNumber)) { + headers.push(block as BlockNumber); + self.downloading_headers.insert(block as BlockNumber); + } + block += 1; + } + prev = next + items.len() as BlockNumber; + } + + if !headers.is_empty() { + start = headers[0] as usize; + let count = headers.len(); + replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, headers); + assert!(!self.headers.have_item(&(start as BlockNumber))); + self.request_headers_by_number(io, peer_id, start as BlockNumber, count, 0, false); + } + } + else { + self.request_headers_by_number(io, peer_id, start as BlockNumber, 1, 0, false); + } + } + } + + /// Clear all blocks/headers marked as being downloaded by a peer. + fn clear_peer_download(&mut self, peer_id: PeerId) { + let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + for b in &peer.asking_blocks { + self.downloading_headers.remove(&b); + self.downloading_bodies.remove(&b); + } + peer.asking_blocks.clear(); + } + + /// Checks if there are blocks fully downloaded that can be imported into the blockchain and does the import. + fn collect_blocks(&mut self, io: &mut SyncIo) { + if !self.have_common_block || self.headers.is_empty() || self.bodies.is_empty() { + return; + } + + let mut restart = false; + // merge headers and bodies + { + let headers = self.headers.range_iter().next().unwrap(); + let bodies = self.bodies.range_iter().next().unwrap(); + if headers.0 != bodies.0 || headers.0 != self.last_imported_block + 1 { + return; + } + + let count = min(headers.1.len(), bodies.1.len()); + let mut imported = 0; + for i in 0..count { + let mut block_rlp = RlpStream::new_list(3); + block_rlp.append_raw(&headers.1[i].data, 1); + let body = Rlp::new(&bodies.1[i]); + block_rlp.append_raw(body.at(0).as_raw(), 1); + block_rlp.append_raw(body.at(1).as_raw(), 1); + let h = &headers.1[i].hash; + match io.chain().import_block(block_rlp.out()) { + Err(ImportError::AlreadyInChain) => { + trace!(target: "sync", "Block already in chain {:?}", h); + self.last_imported_block = headers.0 + i as BlockNumber; + self.last_imported_hash = h.clone(); + }, + Err(ImportError::AlreadyQueued) => { + trace!(target: "sync", "Block already queued {:?}", h); + self.last_imported_block = headers.0 + i as BlockNumber; + self.last_imported_hash = h.clone(); + }, + Ok(_) => { + trace!(target: "sync", "Block queued {:?}", h); + self.last_imported_block = headers.0 + i as BlockNumber; + self.last_imported_hash = h.clone(); + imported += 1; + }, + Err(e) => { + debug!(target: "sync", "Bad block {:?} : {:?}", h, e); + restart = true; + } + } + } + trace!(target: "sync", "Imported {} of {}", imported, count); + } + + if restart { + self.restart(io); + return; + } + + self.headers.remove_head(&(self.last_imported_block + 1)); + self.bodies.remove_head(&(self.last_imported_block + 1)); + + if self.headers.is_empty() { + assert!(self.bodies.is_empty()); + self.complete_sync(); + } + } + + /// Remove downloaded bocks/headers starting from specified number. + /// Used to recover from an error and re-download parts of the chain detected as bad. + fn remove_downloaded_blocks(&mut self, start: BlockNumber) { + for n in self.headers.get_tail(&start) { + if let Some(ref header_data) = self.headers.find_item(&n) { + let header_to_delete = HeaderView::new(&header_data.data); + let header_id = HeaderId { + transactions_root: header_to_delete.transactions_root(), + uncles: header_to_delete.uncles_hash() + }; + self.header_ids.remove(&header_id); + } + self.downloading_bodies.remove(&n); + self.downloading_headers.remove(&n); + } + self.headers.remove_tail(&start); + self.bodies.remove_tail(&start); + } + + /// Request headers from a peer by block hash + fn request_headers_by_hash(&mut self, sync: &mut SyncIo, peer_id: PeerId, h: &H256, count: usize, skip: usize, reverse: bool) { + trace!(target: "sync", "{} <- GetBlockHeaders: {} entries starting from {}", peer_id, count, h); + let mut rlp = RlpStream::new_list(4); + rlp.append(h); + rlp.append(&count); + rlp.append(&skip); + rlp.append(&if reverse {1u32} else {0u32}); + self.send_request(sync, peer_id, PeerAsking::BlockHeaders, GET_BLOCK_HEADERS_PACKET, rlp.out()); + } + + /// Request headers from a peer by block number + fn request_headers_by_number(&mut self, sync: &mut SyncIo, peer_id: PeerId, n: BlockNumber, count: usize, skip: usize, reverse: bool) { + let mut rlp = RlpStream::new_list(4); + trace!(target: "sync", "{} <- GetBlockHeaders: {} entries starting from {}", peer_id, count, n); + rlp.append(&n); + rlp.append(&count); + rlp.append(&skip); + rlp.append(&if reverse {1u32} else {0u32}); + self.send_request(sync, peer_id, PeerAsking::BlockHeaders, GET_BLOCK_HEADERS_PACKET, rlp.out()); + } + + /// Request block bodies from a peer + fn request_bodies(&mut self, sync: &mut SyncIo, peer_id: PeerId, hashes: Vec) { + let mut rlp = RlpStream::new_list(hashes.len()); + trace!(target: "sync", "{} <- GetBlockBodies: {} entries", peer_id, hashes.len()); + for h in hashes { + rlp.append(&h); + } + self.send_request(sync, peer_id, PeerAsking::BlockBodies, GET_BLOCK_BODIES_PACKET, rlp.out()); + } + + /// Reset peer status after request is complete. + fn reset_peer_asking(&mut self, peer_id: PeerId, asking: PeerAsking) { + let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + if peer.asking != asking { + warn!(target:"sync", "Asking {:?} while expected {:?}", peer.asking, asking); + } + else { + peer.asking = PeerAsking::Nothing; + } + } + + /// Generic request sender + fn send_request(&mut self, sync: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, packet_id: PacketId, packet: Bytes) { + { + let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + if peer.asking != PeerAsking::Nothing { + warn!(target:"sync", "Asking {:?} while requesting {:?}", asking, peer.asking); + } + } + match sync.send(peer_id, packet_id, packet) { + Err(e) => { + warn!(target:"sync", "Error sending request: {:?}", e); + sync.disable_peer(peer_id); + self.on_peer_aborting(sync, peer_id); + } + Ok(_) => { + let mut peer = self.peers.get_mut(&peer_id).unwrap(); + peer.asking = asking; + } + } + } + + /// Called when peer sends us new transactions + fn on_peer_transactions(&mut self, _io: &mut SyncIo, _peer_id: PeerId, _r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + Ok(()) + } + + /// Send Status message + fn send_status(&mut self, io: &mut SyncIo, peer_id: PeerId) { + let mut packet = RlpStream::new_list(5); + let chain = io.chain().chain_info(); + packet.append(&(PROTOCOL_VERSION as u32)); + packet.append(&NETWORK_ID); //TODO: network id + packet.append(&chain.total_difficulty); + packet.append(&chain.best_block_hash); + packet.append(&chain.genesis_hash); + //TODO: handle timeout for status request + if let Err(e) = io.send(peer_id, STATUS_PACKET, packet.out()) { + warn!(target:"sync", "Error sending status request: {:?}", e); + io.disable_peer(peer_id); + } + } + + /// Respond to GetBlockHeaders request + fn return_block_headers(&self, io: &mut SyncIo, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + // Packet layout: + // [ block: { P , B_32 }, maxHeaders: P, skip: P, reverse: P in { 0 , 1 } ] + let max_headers: usize = try!(r.val_at(1)); + let skip: usize = try!(r.val_at(2)); + let reverse: bool = try!(r.val_at(3)); + let last = io.chain().chain_info().best_block_number; + let mut number = if try!(r.at(0)).size() == 32 { + // id is a hash + let hash: H256 = try!(r.val_at(0)); + trace!(target: "sync", "-> GetBlockHeaders (hash: {}, max: {}, skip: {}, reverse:{})", hash, max_headers, skip, reverse); + match io.chain().block_header(&hash) { + Some(hdr) => From::from(HeaderView::new(&hdr).number()), + None => last + } + } + else { + trace!(target: "sync", "-> GetBlockHeaders (number: {}, max: {}, skip: {}, reverse:{})", try!(r.val_at::(0)), max_headers, skip, reverse); + try!(r.val_at(0)) + }; + + if reverse { + number = min(last, number); + } else { + number = max(1, number); + } + let max_count = min(MAX_HEADERS_TO_SEND, max_headers); + let mut count = 0; + let mut data = Bytes::new(); + let inc = (skip + 1) as BlockNumber; + while number <= last && number > 0 && count < max_count { + if let Some(mut hdr) = io.chain().block_header_at(number) { + data.append(&mut hdr); + count += 1; + } + if reverse { + if number <= inc { + break; + } + number -= inc; + } + else { + number += inc; + } + } + let mut rlp = RlpStream::new_list(count as usize); + rlp.append_raw(&data, count as usize); + io.respond(BLOCK_HEADERS_PACKET, rlp.out()).unwrap_or_else(|e| + debug!(target: "sync", "Error sending headers: {:?}", e)); + trace!(target: "sync", "-> GetBlockHeaders: returned {} entries", count); + Ok(()) + } + + /// Respond to GetBlockBodies request + fn return_block_bodies(&self, io: &mut SyncIo, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + let mut count = r.item_count(); + if count == 0 { + debug!(target: "sync", "Empty GetBlockBodies request, ignoring."); + return Ok(()); + } + trace!(target: "sync", "-> GetBlockBodies: {} entries", count); + count = min(count, MAX_BODIES_TO_SEND); + let mut added = 0usize; + let mut data = Bytes::new(); + for i in 0..count { + if let Some(mut hdr) = io.chain().block_body(&try!(r.val_at::(i))) { + data.append(&mut hdr); + added += 1; + } + } + let mut rlp = RlpStream::new_list(added); + rlp.append_raw(&data, added); + io.respond(BLOCK_BODIES_PACKET, rlp.out()).unwrap_or_else(|e| + debug!(target: "sync", "Error sending headers: {:?}", e)); + trace!(target: "sync", "-> GetBlockBodies: returned {} entries", added); + Ok(()) + } + + /// Respond to GetNodeData request + fn return_node_data(&self, io: &mut SyncIo, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + let mut count = r.item_count(); + if count == 0 { + debug!(target: "sync", "Empty GetNodeData request, ignoring."); + return Ok(()); + } + count = min(count, MAX_NODE_DATA_TO_SEND); + let mut added = 0usize; + let mut data = Bytes::new(); + for i in 0..count { + if let Some(mut hdr) = io.chain().state_data(&try!(r.val_at::(i))) { + data.append(&mut hdr); + added += 1; + } + } + let mut rlp = RlpStream::new_list(added); + rlp.append_raw(&data, added); + io.respond(NODE_DATA_PACKET, rlp.out()).unwrap_or_else(|e| + debug!(target: "sync", "Error sending headers: {:?}", e)); + Ok(()) + } + + /// Respond to GetReceipts request + fn return_receipts(&self, io: &mut SyncIo, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + let mut count = r.item_count(); + if count == 0 { + debug!(target: "sync", "Empty GetReceipts request, ignoring."); + return Ok(()); + } + count = min(count, MAX_RECEIPTS_TO_SEND); + let mut added = 0usize; + let mut data = Bytes::new(); + for i in 0..count { + if let Some(mut hdr) = io.chain().block_receipts(&try!(r.val_at::(i))) { + data.append(&mut hdr); + added += 1; + } + } + let mut rlp = RlpStream::new_list(added); + rlp.append_raw(&data, added); + io.respond(RECEIPTS_PACKET, rlp.out()).unwrap_or_else(|e| + debug!(target: "sync", "Error sending headers: {:?}", e)); + Ok(()) + } + + /// Dispatch incoming requests and responses + pub fn on_packet(&mut self, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) { + let rlp = UntrustedRlp::new(data); + let result = match packet_id { + STATUS_PACKET => self.on_peer_status(io, peer, &rlp), + TRANSACTIONS_PACKET => self.on_peer_transactions(io, peer, &rlp), + GET_BLOCK_HEADERS_PACKET => self.return_block_headers(io, &rlp), + BLOCK_HEADERS_PACKET => self.on_peer_block_headers(io, peer, &rlp), + GET_BLOCK_BODIES_PACKET => self.return_block_bodies(io, &rlp), + BLOCK_BODIES_PACKET => self.on_peer_block_bodies(io, peer, &rlp), + NEW_BLOCK_PACKET => self.on_peer_new_block(io, peer, &rlp), + NEW_BLOCK_HASHES_PACKET => self.on_peer_new_hashes(io, peer, &rlp), + GET_NODE_DATA_PACKET => self.return_node_data(io, &rlp), + GET_RECEIPTS_PACKET => self.return_receipts(io, &rlp), + _ => { + debug!(target: "sync", "Unknown packet {}", packet_id); + Ok(()) + } + }; + result.unwrap_or_else(|e| { + debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e); + }) + } + + /// Maintain other peers. Send out any new blocks and transactions + pub fn _maintain_sync(&mut self, _io: &mut SyncIo) { + } +} + diff --git a/sync/src/io.rs b/sync/src/io.rs new file mode 100644 index 000000000..4425a2555 --- /dev/null +++ b/sync/src/io.rs @@ -0,0 +1,62 @@ +use ethcore::client::BlockChainClient; +use util::{NetworkContext, PeerId, PacketId,}; +use util::error::UtilError; +use ethcore::service::SyncMessage; + +/// IO interface for the syning handler. +/// Provides peer connection management and an interface to the blockchain client. +// TODO: ratings +pub trait SyncIo { + /// Disable a peer + fn disable_peer(&mut self, peer_id: PeerId); + /// Respond to current request with a packet. Can be called from an IO handler for incoming packet. + fn respond(&mut self, packet_id: PacketId, data: Vec) -> Result<(), UtilError>; + /// Send a packet to a peer. + fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), UtilError>; + /// Get the blockchain + fn chain(&self) -> &BlockChainClient; + /// Returns peer client identifier string + fn peer_info(&self, peer_id: PeerId) -> String { + peer_id.to_string() + } +} + +/// Wraps `NetworkContext` and the blockchain client +pub struct NetSyncIo<'s, 'h> where 'h: 's { + network: &'s NetworkContext<'h, SyncMessage>, + chain: &'s BlockChainClient +} + +impl<'s, 'h> NetSyncIo<'s, 'h> { + /// Creates a new instance from the `NetworkContext` and the blockchain client reference. + pub fn new(network: &'s NetworkContext<'h, SyncMessage>, chain: &'s BlockChainClient) -> NetSyncIo<'s, 'h> { + NetSyncIo { + network: network, + chain: chain, + } + } +} + +impl<'s, 'h> SyncIo for NetSyncIo<'s, 'h> { + fn disable_peer(&mut self, peer_id: PeerId) { + self.network.disable_peer(peer_id); + } + + fn respond(&mut self, packet_id: PacketId, data: Vec) -> Result<(), UtilError>{ + self.network.respond(packet_id, data) + } + + fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), UtilError>{ + self.network.send(peer_id, packet_id, data) + } + + fn chain(&self) -> &BlockChainClient { + self.chain + } + + fn peer_info(&self, peer_id: PeerId) -> String { + self.network.peer_info(peer_id) + } +} + + diff --git a/sync/src/lib.rs b/sync/src/lib.rs new file mode 100644 index 000000000..09f3eb521 --- /dev/null +++ b/sync/src/lib.rs @@ -0,0 +1,106 @@ +#![warn(missing_docs)] +#![feature(plugin)] +#![plugin(clippy)] +#![feature(augmented_assignments)] +//! Blockchain sync module +//! Implements ethereum protocol version 63 as specified here: +//! https://github.com/ethereum/wiki/wiki/Ethereum-Wire-Protocol +//! +//! Usage example: +//! +//! ```rust +//! extern crate ethcore_util as util; +//! extern crate ethcore; +//! extern crate ethsync; +//! use std::env; +//! use std::sync::Arc; +//! use util::network::{NetworkService, NetworkConfiguration}; +//! use ethcore::client::Client; +//! use ethsync::EthSync; +//! use ethcore::ethereum; +//! +//! fn main() { +//! let mut service = NetworkService::start(NetworkConfiguration::new()).unwrap(); +//! let dir = env::temp_dir(); +//! let client = Client::new(ethereum::new_frontier(), &dir, service.io().channel()).unwrap(); +//! EthSync::register(&mut service, client); +//! } +//! ``` + +#[macro_use] +extern crate log; +#[macro_use] +extern crate ethcore_util as util; +extern crate ethcore; +extern crate env_logger; + +use std::ops::*; +use std::sync::*; +use ethcore::client::Client; +use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId}; +use chain::ChainSync; +use ethcore::service::SyncMessage; +use io::NetSyncIo; + +mod chain; +mod io; +mod range_collection; + +#[cfg(test)] +mod tests; + +/// Ethereum network protocol handler +pub struct EthSync { + /// Shared blockchain client. TODO: this should evetually become an IPC endpoint + chain: Arc, + /// Sync strategy + sync: RwLock +} + +pub use self::chain::SyncStatus; + +impl EthSync { + /// Creates and register protocol with the network service + pub fn register(service: &mut NetworkService, chain: Arc) -> Arc { + let sync = Arc::new(EthSync { + chain: chain, + sync: RwLock::new(ChainSync::new()), + }); + service.register_protocol(sync.clone(), "eth", &[62u8, 63u8]).expect("Error registering eth protocol handler"); + sync + } + + /// Get sync status + pub fn status(&self) -> SyncStatus { + self.sync.read().unwrap().status() + } + + /// Stop sync + pub fn stop(&mut self, io: &mut NetworkContext) { + self.sync.write().unwrap().abort(&mut NetSyncIo::new(io, self.chain.deref())); + } + + /// Restart sync + pub fn restart(&mut self, io: &mut NetworkContext) { + self.sync.write().unwrap().restart(&mut NetSyncIo::new(io, self.chain.deref())); + } +} + +impl NetworkProtocolHandler for EthSync { + fn initialize(&self, _io: &NetworkContext) { + } + + fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { + self.sync.write().unwrap().on_packet(&mut NetSyncIo::new(io, self.chain.deref()) , *peer, packet_id, data); + } + + fn connected(&self, io: &NetworkContext, peer: &PeerId) { + self.sync.write().unwrap().on_peer_connected(&mut NetSyncIo::new(io, self.chain.deref()), *peer); + } + + fn disconnected(&self, io: &NetworkContext, peer: &PeerId) { + self.sync.write().unwrap().on_peer_aborting(&mut NetSyncIo::new(io, self.chain.deref()), *peer); + } +} + + diff --git a/sync/src/range_collection.rs b/sync/src/range_collection.rs new file mode 100644 index 000000000..b8186e5a5 --- /dev/null +++ b/sync/src/range_collection.rs @@ -0,0 +1,260 @@ +/// This module defines a trait for a collection of ranged values and an implementation +/// for this trait over sorted vector. + +use std::ops::{Add, Sub, Range}; + +pub trait ToUsize { + fn to_usize(&self) -> usize; +} + +pub trait FromUsize { + fn from_usize(s: usize) -> Self; +} + +/// A key-value collection orderd by key with sequential key-value pairs grouped together. +/// Such group is called a range. +/// E.g. a set of collection of 5 pairs {1, a}, {2, b}, {10, x}, {11, y}, {12, z} will be grouped into two ranges: {1, [a,b]}, {10, [x,y,z]} +pub trait RangeCollection { + /// Check if the given key is present in the collection. + fn have_item(&self, key: &K) -> bool; + /// Get value by key. + fn find_item(&self, key: &K) -> Option<&V>; + /// Get a range of keys from `key` till the end of the range that has `key` + /// Returns an empty range is key does not exist. + fn get_tail(&mut self, key: &K) -> Range; + /// Remove all elements < `start` in the range that contains `start` - 1 + fn remove_head(&mut self, start: &K); + /// Remove all elements >= `start` in the range that contains `start` + fn remove_tail(&mut self, start: &K); + /// Remove all elements >= `tail` + fn insert_item(&mut self, key: K, value: V); + /// Get an iterator over ranges + fn range_iter(& self) -> RangeIterator; +} + +/// Range iterator. For each range yelds a key for the first element of the range and a vector of values. +pub struct RangeIterator<'c, K:'c, V:'c> { + range: usize, + collection: &'c Vec<(K, Vec)> +} + +impl<'c, K:'c, V:'c> Iterator for RangeIterator<'c, K, V> where K: Add + FromUsize + ToUsize + Copy { + type Item = (K, &'c [V]); + // The 'Iterator' trait only requires the 'next' method to be defined. The + // return type is 'Option', 'None' is returned when the 'Iterator' is + // over, otherwise the next value is returned wrapped in 'Some' + fn next(&mut self) -> Option<(K, &'c [V])> { + if self.range > 0 { + self.range -= 1; + } + else { + return None; + } + match self.collection.get(self.range) { + Some(&(ref k, ref vec)) => { + Some((*k, &vec)) + }, + None => None + } + } +} + +impl RangeCollection for Vec<(K, Vec)> where K: Ord + PartialEq + Add + Sub + Copy + FromUsize + ToUsize { + fn range_iter(&self) -> RangeIterator { + RangeIterator { + range: self.len(), + collection: self + } + } + + fn have_item(&self, key: &K) -> bool { + match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { + Ok(_) => true, + Err(index) => match self.get(index) { + Some(&(ref k, ref v)) => k <= key && (*k + FromUsize::from_usize(v.len())) > *key, + _ => false + }, + } + } + + fn find_item(&self, key: &K) -> Option<&V> { + match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { + Ok(index) => self.get(index).unwrap().1.get(0), + Err(index) => match self.get(index) { + Some(&(ref k, ref v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => v.get((*key - *k).to_usize()), + _ => None + }, + } + } + + fn get_tail(&mut self, key: &K) -> Range { + let kv = *key; + match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { + Ok(index) => kv..(kv + FromUsize::from_usize(self[index].1.len())), + Err(index) => { + match self.get_mut(index) { + Some(&mut (ref k, ref mut v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => { + kv..(*k + FromUsize::from_usize(v.len())) + } + _ => kv..kv + } + }, + } + } + /// Remove element key and following elements in the same range + fn remove_tail(&mut self, key: &K) { + match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { + Ok(index) => { self.remove(index); }, + Err(index) =>{ + let mut empty = false; + match self.get_mut(index) { + Some(&mut (ref k, ref mut v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => { + v.truncate((*key - *k).to_usize()); + empty = v.is_empty(); + } + _ => {} + } + if empty { + self.remove(index); + } + }, + } + } + + /// Remove range elements up to key + fn remove_head(&mut self, key: &K) { + if *key == FromUsize::from_usize(0) { + return + } + + let prev = *key - FromUsize::from_usize(1); + match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { + Ok(_) => { }, //start of range, do nothing. + Err(index) => { + let mut empty = false; + match self.get_mut(index) { + Some(&mut (ref mut k, ref mut v)) if *k <= prev && (*k + FromUsize::from_usize(v.len())) > prev => { + let tail = v.split_off((*key - *k).to_usize()); + empty = tail.is_empty(); + let removed = ::std::mem::replace(v, tail); + let new_k = *k + FromUsize::from_usize(removed.len()); + ::std::mem::replace(k, new_k); + } + _ => {} + } + if empty { + self.remove(index); + } + }, + } + } + + fn insert_item(&mut self, key: K, value: V) { + assert!(!self.have_item(&key)); + + let lower = match self.binary_search_by(|&(k, _)| k.cmp(&key).reverse()) { + Ok(index) => index, + Err(index) => index, + }; + + let mut to_remove: Option = None; + if lower < self.len() && self[lower].0 + FromUsize::from_usize(self[lower].1.len()) == key { + // extend into existing chunk + self[lower].1.push(value); + } + else { + // insert a new chunk + let range: Vec = vec![value]; + self.insert(lower, (key, range)); + }; + if lower > 0 { + let next = lower - 1; + if next < self.len() + { + { + let (mut next, mut inserted) = self.split_at_mut(lower); + let mut next = next.last_mut().unwrap(); + let mut inserted = inserted.first_mut().unwrap(); + if next.0 == key + FromUsize::from_usize(1) + { + inserted.1.append(&mut next.1); + to_remove = Some(lower - 1); + } + } + + if let Some(r) = to_remove { + self.remove(r); + } + } + } + } +} + +#[test] +#[allow(cyclomatic_complexity)] +fn test_range() { + use std::cmp::{Ordering}; + + let mut ranges: Vec<(u64, Vec)> = Vec::new(); + assert_eq!(ranges.range_iter().next(), None); + assert_eq!(ranges.find_item(&1), None); + assert!(!ranges.have_item(&1)); + assert_eq!(ranges.get_tail(&0), 0..0); + + ranges.insert_item(17, 'q'); + assert_eq!(ranges.range_iter().cmp(vec![(17, &['q'][..])]), Ordering::Equal); + assert_eq!(ranges.find_item(&17), Some(&'q')); + assert!(ranges.have_item(&17)); + assert_eq!(ranges.get_tail(&17), 17..18); + + ranges.insert_item(18, 'r'); + assert_eq!(ranges.range_iter().cmp(vec![(17, &['q', 'r'][..])]), Ordering::Equal); + assert_eq!(ranges.find_item(&18), Some(&'r')); + assert!(ranges.have_item(&18)); + assert_eq!(ranges.get_tail(&17), 17..19); + + ranges.insert_item(16, 'p'); + assert_eq!(ranges.range_iter().cmp(vec![(16, &['p', 'q', 'r'][..])]), Ordering::Equal); + assert_eq!(ranges.find_item(&16), Some(&'p')); + assert_eq!(ranges.find_item(&17), Some(&'q')); + assert_eq!(ranges.find_item(&18), Some(&'r')); + assert!(ranges.have_item(&16)); + assert_eq!(ranges.get_tail(&17), 17..19); + assert_eq!(ranges.get_tail(&16), 16..19); + + ranges.insert_item(2, 'b'); + assert_eq!(ranges.range_iter().cmp(vec![(2, &['b'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); + assert_eq!(ranges.find_item(&2), Some(&'b')); + + ranges.insert_item(3, 'c'); + ranges.insert_item(4, 'd'); + assert_eq!(ranges.get_tail(&3), 3..5); + assert_eq!(ranges.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); + + let mut r = ranges.clone(); + r.remove_head(&1); + assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); + r.remove_head(&2); + assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); + r.remove_head(&3); + assert_eq!(r.range_iter().cmp(vec![(3, &['c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); + r.remove_head(&10); + assert_eq!(r.range_iter().cmp(vec![(3, &['c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); + r.remove_head(&5); + assert_eq!(r.range_iter().cmp(vec![(16, &['p', 'q', 'r'][..])]), Ordering::Equal); + r.remove_head(&19); + assert_eq!(r.range_iter().next(), None); + + let mut r = ranges.clone(); + r.remove_tail(&20); + assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); + r.remove_tail(&17); + assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p'][..])]), Ordering::Equal); + r.remove_tail(&16); + assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..])]), Ordering::Equal); + r.remove_tail(&3); + assert_eq!(r.range_iter().cmp(vec![(2, &['b'][..])]), Ordering::Equal); + r.remove_tail(&2); + assert_eq!(r.range_iter().next(), None); +} + diff --git a/sync/src/service.rs b/sync/src/service.rs new file mode 100644 index 000000000..8c900d20a --- /dev/null +++ b/sync/src/service.rs @@ -0,0 +1,104 @@ +use util::*; +use sync::*; +use spec::Spec; +use error::*; +use std::env; +use client::Client; + +/// Message type for external and internal events +#[derive(Clone)] +pub enum SyncMessage { + /// New block has been imported into the blockchain + NewChainBlock(Bytes), //TODO: use Cow + /// A block is ready + BlockVerified, +} + +/// TODO [arkpar] Please document me +pub type NetSyncMessage = NetworkIoMessage; + +/// Client service setup. Creates and registers client and network services with the IO subsystem. +pub struct ClientService { + net_service: NetworkService, + client: Arc, + sync: Arc, +} + +impl ClientService { + /// Start the service in a separate thread. + pub fn start(spec: Spec, net_config: NetworkConfiguration) -> Result { + let mut net_service = try!(NetworkService::start(net_config)); + info!("Starting {}", net_service.host_info()); + info!("Configured for {} using {} engine", spec.name, spec.engine_name); + let mut dir = env::home_dir().unwrap(); + dir.push(".parity"); + dir.push(H64::from(spec.genesis_header().hash()).hex()); + let client = try!(Client::new(spec, &dir, net_service.io().channel())); + let sync = EthSync::register(&mut net_service, client.clone()); + let client_io = Arc::new(ClientIoHandler { + client: client.clone() + }); + try!(net_service.io().register_handler(client_io)); + + Ok(ClientService { + net_service: net_service, + client: client, + sync: sync, + }) + } + + /// Get the network service. + pub fn add_node(&mut self, _enode: &str) { + unimplemented!(); + } + + /// TODO [arkpar] Please document me + pub fn io(&mut self) -> &mut IoService { + self.net_service.io() + } + + /// TODO [arkpar] Please document me + pub fn client(&self) -> Arc { + self.client.clone() + + } + + /// Get shared sync handler + pub fn sync(&self) -> Arc { + self.sync.clone() + } +} + +/// IO interface for the Client handler +struct ClientIoHandler { + client: Arc +} + +const CLIENT_TICK_TIMER: TimerToken = 0; +const CLIENT_TICK_MS: u64 = 5000; + +impl IoHandler for ClientIoHandler { + fn initialize(&self, io: &IoContext) { + io.register_timer(CLIENT_TICK_TIMER, CLIENT_TICK_MS).expect("Error registering client timer"); + } + + fn timeout(&self, _io: &IoContext, timer: TimerToken) { + if timer == CLIENT_TICK_TIMER { + self.client.tick(); + } + } + + #[allow(match_ref_pats)] + #[allow(single_match)] + fn message(&self, io: &IoContext, net_message: &NetSyncMessage) { + if let &UserMessage(ref message) = net_message { + match message { + &SyncMessage::BlockVerified => { + self.client.import_verified_blocks(&io.channel()); + }, + _ => {}, // ignore other messages + } + } + } +} + diff --git a/sync/src/tests.rs b/sync/src/tests.rs new file mode 100644 index 000000000..070ae566d --- /dev/null +++ b/sync/src/tests.rs @@ -0,0 +1,349 @@ +use util::*; +use ethcore::client::{BlockChainClient, BlockStatus, TreeRoute, BlockChainInfo}; +use ethcore::block_queue::BlockQueueInfo; +use ethcore::header::{Header as BlockHeader, BlockNumber}; +use ethcore::error::*; +use io::SyncIo; +use chain::ChainSync; + +struct TestBlockChainClient { + blocks: RwLock>, + numbers: RwLock>, + genesis_hash: H256, + last_hash: RwLock, + difficulty: RwLock, +} + +impl TestBlockChainClient { + fn new() -> TestBlockChainClient { + + let mut client = TestBlockChainClient { + blocks: RwLock::new(HashMap::new()), + numbers: RwLock::new(HashMap::new()), + genesis_hash: H256::new(), + last_hash: RwLock::new(H256::new()), + difficulty: RwLock::new(From::from(0)), + }; + client.add_blocks(1, true); // add genesis block + client.genesis_hash = client.last_hash.read().unwrap().clone(); + client + } + + pub fn add_blocks(&mut self, count: usize, empty: bool) { + let len = self.numbers.read().unwrap().len(); + for n in len..(len + count) { + let mut header = BlockHeader::new(); + header.difficulty = From::from(n); + header.parent_hash = self.last_hash.read().unwrap().clone(); + header.number = n as BlockNumber; + let mut uncles = RlpStream::new_list(if empty {0} else {1}); + if !empty { + uncles.append(&H256::from(&U256::from(n))); + header.uncles_hash = uncles.as_raw().sha3(); + } + let mut rlp = RlpStream::new_list(3); + rlp.append(&header); + rlp.append_raw(&rlp::NULL_RLP, 1); + rlp.append_raw(uncles.as_raw(), 1); + self.import_block(rlp.as_raw().to_vec()).unwrap(); + } + } +} + +impl BlockChainClient for TestBlockChainClient { + fn block_total_difficulty(&self, _h: &H256) -> Option { + unimplemented!(); + } + + fn block_header(&self, h: &H256) -> Option { + self.blocks.read().unwrap().get(h).map(|r| Rlp::new(r).at(0).as_raw().to_vec()) + + } + + fn block_body(&self, h: &H256) -> Option { + self.blocks.read().unwrap().get(h).map(|r| { + let mut stream = RlpStream::new_list(2); + stream.append_raw(Rlp::new(&r).at(1).as_raw(), 1); + stream.append_raw(Rlp::new(&r).at(2).as_raw(), 1); + stream.out() + }) + } + + fn block(&self, h: &H256) -> Option { + self.blocks.read().unwrap().get(h).cloned() + } + + fn block_status(&self, h: &H256) -> BlockStatus { + match self.blocks.read().unwrap().get(h) { + Some(_) => BlockStatus::InChain, + None => BlockStatus::Unknown + } + } + + fn block_total_difficulty_at(&self, _number: BlockNumber) -> Option { + unimplemented!(); + } + + fn block_header_at(&self, n: BlockNumber) -> Option { + self.numbers.read().unwrap().get(&(n as usize)).and_then(|h| self.block_header(h)) + } + + fn block_body_at(&self, n: BlockNumber) -> Option { + self.numbers.read().unwrap().get(&(n as usize)).and_then(|h| self.block_body(h)) + } + + fn block_at(&self, n: BlockNumber) -> Option { + self.numbers.read().unwrap().get(&(n as usize)).map(|h| self.blocks.read().unwrap().get(h).unwrap().clone()) + } + + fn block_status_at(&self, n: BlockNumber) -> BlockStatus { + if (n as usize) < self.blocks.read().unwrap().len() { + BlockStatus::InChain + } else { + BlockStatus::Unknown + } + } + + fn tree_route(&self, _from: &H256, _to: &H256) -> Option { + Some(TreeRoute { + blocks: Vec::new(), + ancestor: H256::new(), + index: 0 + }) + } + + fn state_data(&self, _h: &H256) -> Option { + None + } + + fn block_receipts(&self, _h: &H256) -> Option { + None + } + + fn import_block(&self, b: Bytes) -> ImportResult { + let header = Rlp::new(&b).val_at::(0); + let h = header.hash(); + let number: usize = header.number as usize; + if number > self.blocks.read().unwrap().len() { + panic!("Unexpected block number. Expected {}, got {}", self.blocks.read().unwrap().len(), number); + } + if number > 0 { + match self.blocks.read().unwrap().get(&header.parent_hash) { + Some(parent) => { + let parent = Rlp::new(parent).val_at::(0); + if parent.number != (header.number - 1) { + panic!("Unexpected block parent"); + } + }, + None => { + panic!("Unknown block parent {:?} for block {}", header.parent_hash, number); + } + } + } + let len = self.numbers.read().unwrap().len(); + if number == len { + *self.difficulty.write().unwrap().deref_mut() += header.difficulty; + mem::replace(self.last_hash.write().unwrap().deref_mut(), h.clone()); + self.blocks.write().unwrap().insert(h.clone(), b); + self.numbers.write().unwrap().insert(number, h.clone()); + let mut parent_hash = header.parent_hash; + if number > 0 { + let mut n = number - 1; + while n > 0 && self.numbers.read().unwrap()[&n] != parent_hash { + *self.numbers.write().unwrap().get_mut(&n).unwrap() = parent_hash.clone(); + n -= 1; + parent_hash = Rlp::new(&self.blocks.read().unwrap()[&parent_hash]).val_at::(0).parent_hash; + } + } + } + else { + self.blocks.write().unwrap().insert(h.clone(), b.to_vec()); + } + Ok(h) + } + + fn queue_info(&self) -> BlockQueueInfo { + BlockQueueInfo { + full: false, + verified_queue_size: 0, + unverified_queue_size: 0, + verifying_queue_size: 0, + } + } + + fn clear_queue(&self) { + } + + fn chain_info(&self) -> BlockChainInfo { + BlockChainInfo { + total_difficulty: *self.difficulty.read().unwrap(), + pending_total_difficulty: *self.difficulty.read().unwrap(), + genesis_hash: self.genesis_hash.clone(), + best_block_hash: self.last_hash.read().unwrap().clone(), + best_block_number: self.blocks.read().unwrap().len() as BlockNumber - 1, + } + } +} + +struct TestIo<'p> { + chain: &'p mut TestBlockChainClient, + queue: &'p mut VecDeque, + sender: Option, +} + +impl<'p> TestIo<'p> { + fn new(chain: &'p mut TestBlockChainClient, queue: &'p mut VecDeque, sender: Option) -> TestIo<'p> { + TestIo { + chain: chain, + queue: queue, + sender: sender + } + } +} + +impl<'p> SyncIo for TestIo<'p> { + fn disable_peer(&mut self, _peer_id: PeerId) { + } + + fn respond(&mut self, packet_id: PacketId, data: Vec) -> Result<(), UtilError> { + self.queue.push_back(TestPacket { + data: data, + packet_id: packet_id, + recipient: self.sender.unwrap() + }); + Ok(()) + } + + fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), UtilError> { + self.queue.push_back(TestPacket { + data: data, + packet_id: packet_id, + recipient: peer_id, + }); + Ok(()) + } + + fn chain(&self) -> &BlockChainClient { + self.chain + } +} + +struct TestPacket { + data: Bytes, + packet_id: PacketId, + recipient: PeerId, +} + +struct TestPeer { + chain: TestBlockChainClient, + sync: ChainSync, + queue: VecDeque, +} + +struct TestNet { + peers: Vec +} + +impl TestNet { + pub fn new(n: usize) -> TestNet { + let mut net = TestNet { + peers: Vec::new(), + }; + for _ in 0..n { + net.peers.push(TestPeer { + chain: TestBlockChainClient::new(), + sync: ChainSync::new(), + queue: VecDeque::new(), + }); + } + net + } + + pub fn peer(&self, i: usize) -> &TestPeer { + self.peers.get(i).unwrap() + } + + pub fn peer_mut(&mut self, i: usize) -> &mut TestPeer { + self.peers.get_mut(i).unwrap() + } + + pub fn start(&mut self) { + for peer in 0..self.peers.len() { + for client in 0..self.peers.len() { + if peer != client { + let mut p = self.peers.get_mut(peer).unwrap(); + p.sync.on_peer_connected(&mut TestIo::new(&mut p.chain, &mut p.queue, Some(client as PeerId)), client as PeerId); + } + } + } + } + + pub fn sync_step(&mut self) { + for peer in 0..self.peers.len() { + if let Some(packet) = self.peers[peer].queue.pop_front() { + let mut p = self.peers.get_mut(packet.recipient).unwrap(); + trace!("--- {} -> {} ---", peer, packet.recipient); + p.sync.on_packet(&mut TestIo::new(&mut p.chain, &mut p.queue, Some(peer as PeerId)), peer as PeerId, packet.packet_id, &packet.data); + trace!("----------------"); + } + let mut p = self.peers.get_mut(peer).unwrap(); + p.sync._maintain_sync(&mut TestIo::new(&mut p.chain, &mut p.queue, None)); + } + } + + pub fn sync(&mut self) { + self.start(); + while !self.done() { + self.sync_step() + } + } + + pub fn done(&self) -> bool { + self.peers.iter().all(|p| p.queue.is_empty()) + } +} + + +#[test] +fn full_sync_two_peers() { + ::env_logger::init().ok(); + let mut net = TestNet::new(3); + net.peer_mut(1).chain.add_blocks(1000, false); + net.peer_mut(2).chain.add_blocks(1000, false); + net.sync(); + assert!(net.peer(0).chain.block_at(1000).is_some()); + assert_eq!(net.peer(0).chain.blocks.read().unwrap().deref(), net.peer(1).chain.blocks.read().unwrap().deref()); +} + +#[test] +fn full_sync_empty_blocks() { + ::env_logger::init().ok(); + let mut net = TestNet::new(3); + for n in 0..200 { + net.peer_mut(1).chain.add_blocks(5, n % 2 == 0); + net.peer_mut(2).chain.add_blocks(5, n % 2 == 0); + } + net.sync(); + assert!(net.peer(0).chain.block_at(1000).is_some()); + assert_eq!(net.peer(0).chain.blocks.read().unwrap().deref(), net.peer(1).chain.blocks.read().unwrap().deref()); +} + +#[test] +fn forked_sync() { + ::env_logger::init().ok(); + let mut net = TestNet::new(3); + net.peer_mut(0).chain.add_blocks(300, false); + net.peer_mut(1).chain.add_blocks(300, false); + net.peer_mut(2).chain.add_blocks(300, false); + net.peer_mut(0).chain.add_blocks(100, true); //fork + net.peer_mut(1).chain.add_blocks(200, false); + net.peer_mut(2).chain.add_blocks(200, false); + net.peer_mut(1).chain.add_blocks(100, false); //fork between 1 and 2 + net.peer_mut(2).chain.add_blocks(10, true); + // peer 1 has the best chain of 601 blocks + let peer1_chain = net.peer(1).chain.numbers.read().unwrap().clone(); + net.sync(); + assert_eq!(net.peer(0).chain.numbers.read().unwrap().deref(), &peer1_chain); + assert_eq!(net.peer(1).chain.numbers.read().unwrap().deref(), &peer1_chain); + assert_eq!(net.peer(2).chain.numbers.read().unwrap().deref(), &peer1_chain); +} diff --git a/src/sync/tests.rs b/sync/tests.rs similarity index 100% rename from src/sync/tests.rs rename to sync/tests.rs From 61f39d68f859a7bfdb99a660778605574c993ed4 Mon Sep 17 00:00:00 2001 From: arkpar Date: Fri, 29 Jan 2016 16:38:03 +0100 Subject: [PATCH 2/3] Block validation --- sync/src/chain.rs | 7 +++++++ sync/src/tests.rs | 6 +++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 3c31315f2..13158f95a 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -20,6 +20,7 @@ use ethcore::header::{BlockNumber, Header as BlockHeader}; use ethcore::client::{BlockChainClient, BlockStatus}; use range_collection::{RangeCollection, ToUsize, FromUsize}; use ethcore::error::*; +use ethcore::block::Block; use io::SyncIo; impl ToUsize for BlockNumber { @@ -669,6 +670,12 @@ impl ChainSync { block_rlp.append_raw(body.at(0).as_raw(), 1); block_rlp.append_raw(body.at(1).as_raw(), 1); let h = &headers.1[i].hash; + // Perform basic block verification + if !Block::is_good(block_rlp.as_raw()) { + debug!(target: "sync", "Bad block rlp {:?} : {:?}", h, block_rlp.as_raw()); + restart = true; + break; + } match io.chain().import_block(block_rlp.out()) { Err(ImportError::AlreadyInChain) => { trace!(target: "sync", "Block already in chain {:?}", h); diff --git a/sync/src/tests.rs b/sync/src/tests.rs index 070ae566d..41516ef60 100644 --- a/sync/src/tests.rs +++ b/sync/src/tests.rs @@ -38,7 +38,11 @@ impl TestBlockChainClient { header.number = n as BlockNumber; let mut uncles = RlpStream::new_list(if empty {0} else {1}); if !empty { - uncles.append(&H256::from(&U256::from(n))); + let mut uncle_header = BlockHeader::new(); + uncle_header.difficulty = From::from(n); + uncle_header.parent_hash = self.last_hash.read().unwrap().clone(); + uncle_header.number = n as BlockNumber; + uncles.append(&uncle_header); header.uncles_hash = uncles.as_raw().sha3(); } let mut rlp = RlpStream::new_list(3); From 15fb62f176ae3fe6a90e49e25cb125c2d73ac0c6 Mon Sep 17 00:00:00 2001 From: arkpar Date: Fri, 29 Jan 2016 17:17:21 +0100 Subject: [PATCH 3/3] Removed dup files --- sync/chain.rs | 976 --------------------------------------- sync/io.rs | 62 --- sync/mod.rs | 94 ---- sync/range_collection.rs | 260 ----------- sync/tests.rs | 406 ---------------- 5 files changed, 1798 deletions(-) delete mode 100644 sync/chain.rs delete mode 100644 sync/io.rs delete mode 100644 sync/mod.rs delete mode 100644 sync/range_collection.rs delete mode 100644 sync/tests.rs diff --git a/sync/chain.rs b/sync/chain.rs deleted file mode 100644 index 3eaf69cb1..000000000 --- a/sync/chain.rs +++ /dev/null @@ -1,976 +0,0 @@ -/// -/// BlockChain synchronization strategy. -/// Syncs to peers and keeps up to date. -/// This implementation uses ethereum protocol v63 -/// -/// Syncing strategy. -/// -/// 1. A peer arrives with a total difficulty better than ours -/// 2. Find a common best block between our an peer chain. -/// Start with out best block and request headers from peer backwards until a common block is found -/// 3. Download headers and block bodies from peers in parallel. -/// As soon as a set of the blocks is fully downloaded at the head of the queue it is fed to the blockchain -/// 4. Maintain sync by handling NewBlocks/NewHashes messages -/// - -use util::*; -use std::mem::{replace}; -use views::{HeaderView}; -use header::{BlockNumber, Header as BlockHeader}; -use client::{BlockChainClient, BlockStatus}; -use sync::range_collection::{RangeCollection, ToUsize, FromUsize}; -use error::*; -use sync::io::SyncIo; -use std::option::Option; - -impl ToUsize for BlockNumber { - fn to_usize(&self) -> usize { - *self as usize - } -} - -impl FromUsize for BlockNumber { - fn from_usize(s: usize) -> BlockNumber { - s as BlockNumber - } -} - -type PacketDecodeError = DecoderError; - -const PROTOCOL_VERSION: u8 = 63u8; -const MAX_BODIES_TO_SEND: usize = 256; -const MAX_HEADERS_TO_SEND: usize = 512; -const MAX_NODE_DATA_TO_SEND: usize = 1024; -const MAX_RECEIPTS_TO_SEND: usize = 1024; -const MAX_HEADERS_TO_REQUEST: usize = 512; -const MAX_BODIES_TO_REQUEST: usize = 256; - -const STATUS_PACKET: u8 = 0x00; -const NEW_BLOCK_HASHES_PACKET: u8 = 0x01; -const TRANSACTIONS_PACKET: u8 = 0x02; -const GET_BLOCK_HEADERS_PACKET: u8 = 0x03; -const BLOCK_HEADERS_PACKET: u8 = 0x04; -const GET_BLOCK_BODIES_PACKET: u8 = 0x05; -const BLOCK_BODIES_PACKET: u8 = 0x06; -const NEW_BLOCK_PACKET: u8 = 0x07; - -const GET_NODE_DATA_PACKET: u8 = 0x0d; -const NODE_DATA_PACKET: u8 = 0x0e; -const GET_RECEIPTS_PACKET: u8 = 0x0f; -const RECEIPTS_PACKET: u8 = 0x10; - -const NETWORK_ID: U256 = ONE_U256; //TODO: get this from parent - -struct Header { - /// Header data - data: Bytes, - /// Block hash - hash: H256, - /// Parent hash - parent: H256, -} - -/// Used to identify header by transactions and uncles hashes -#[derive(Eq, PartialEq, Hash)] -struct HeaderId { - transactions_root: H256, - uncles: H256 -} - -#[derive(Copy, Clone, Eq, PartialEq, Debug)] -/// Sync state -pub enum SyncState { - /// Initial chain sync has not started yet - NotSynced, - /// Initial chain sync complete. Waiting for new packets - Idle, - /// Block downloading paused. Waiting for block queue to process blocks and free some space - Waiting, - /// Downloading blocks - Blocks, - /// Downloading blocks learned from NewHashes packet - NewBlocks, -} - -/// Syncing status and statistics -pub struct SyncStatus { - /// State - pub state: SyncState, - /// Syncing protocol version. That's the maximum protocol version we connect to. - pub protocol_version: u8, - /// BlockChain height for the moment the sync started. - pub start_block_number: BlockNumber, - /// Last fully downloaded and imported block number (if any). - pub last_imported_block_number: Option, - /// Highest block number in the download queue (if any). - pub highest_block_number: Option, - /// Total number of blocks for the sync process. - pub blocks_total: BlockNumber, - /// Number of blocks downloaded so far. - pub blocks_received: BlockNumber, - /// Total number of connected peers - pub num_peers: usize, - /// Total number of active peers - pub num_active_peers: usize, -} - -#[derive(PartialEq, Eq, Debug)] -/// Peer data type requested -enum PeerAsking { - Nothing, - BlockHeaders, - BlockBodies, -} - -/// Syncing peer information -struct PeerInfo { - /// eth protocol version - protocol_version: u32, - /// Peer chain genesis hash - genesis: H256, - /// Peer network id - network_id: U256, - /// Peer best block hash - latest: H256, - /// Peer total difficulty - difficulty: U256, - /// Type of data currenty being requested from peer. - asking: PeerAsking, - /// A set of block numbers being requested - asking_blocks: Vec, -} - -/// Blockchain sync handler. -/// See module documentation for more details. -pub struct ChainSync { - /// Sync state - state: SyncState, - /// Last block number for the start of sync - starting_block: BlockNumber, - /// Highest block number seen - highest_block: Option, - /// Set of block header numbers being downloaded - downloading_headers: HashSet, - /// Set of block body numbers being downloaded - downloading_bodies: HashSet, - /// Downloaded headers. - headers: Vec<(BlockNumber, Vec
)>, //TODO: use BTreeMap once range API is sable. For now it is a vector sorted in descending order - /// Downloaded bodies - bodies: Vec<(BlockNumber, Vec)>, //TODO: use BTreeMap once range API is sable. For now it is a vector sorted in descending order - /// Peer info - peers: HashMap, - /// Used to map body to header - header_ids: HashMap, - /// Last impoted block number - last_imported_block: Option, - /// Last impoted block hash - last_imported_hash: Option, - /// Syncing total difficulty - syncing_difficulty: U256, - /// True if common block for our and remote chain has been found - have_common_block: bool, -} - - -impl ChainSync { - /// Create a new instance of syncing strategy. - pub fn new() -> ChainSync { - ChainSync { - state: SyncState::NotSynced, - starting_block: 0, - highest_block: None, - downloading_headers: HashSet::new(), - downloading_bodies: HashSet::new(), - headers: Vec::new(), - bodies: Vec::new(), - peers: HashMap::new(), - header_ids: HashMap::new(), - last_imported_block: None, - last_imported_hash: None, - syncing_difficulty: U256::from(0u64), - have_common_block: false, - } - } - - /// @returns Synchonization status - pub fn status(&self) -> SyncStatus { - SyncStatus { - state: self.state.clone(), - protocol_version: 63, - start_block_number: self.starting_block, - last_imported_block_number: self.last_imported_block, - highest_block_number: self.highest_block, - blocks_received: match self.last_imported_block { None => 0, Some(x) => x - self.starting_block }, - blocks_total: match self.highest_block { None => 0, Some(x) => x - self.starting_block }, - num_peers: self.peers.len(), - num_active_peers: self.peers.values().filter(|p| p.asking != PeerAsking::Nothing).count(), - } - } - - /// Abort all sync activity - pub fn abort(&mut self, io: &mut SyncIo) { - self.restart(io); - self.peers.clear(); - } - - /// Rest sync. Clear all downloaded data but keep the queue - fn reset(&mut self) { - self.downloading_headers.clear(); - self.downloading_bodies.clear(); - self.headers.clear(); - self.bodies.clear(); - for (_, ref mut p) in &mut self.peers { - p.asking_blocks.clear(); - } - self.header_ids.clear(); - self.syncing_difficulty = From::from(0u64); - self.state = SyncState::Idle; - } - - /// Restart sync - pub fn restart(&mut self, io: &mut SyncIo) { - self.reset(); - self.last_imported_block = None; - self.last_imported_hash = None; - self.starting_block = 0; - self.highest_block = None; - self.have_common_block = false; - io.chain().clear_queue(); - self.starting_block = io.chain().chain_info().best_block_number; - self.state = SyncState::NotSynced; - } - - /// Called by peer to report status - fn on_peer_status(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { - let peer = PeerInfo { - protocol_version: try!(r.val_at(0)), - network_id: try!(r.val_at(1)), - difficulty: try!(r.val_at(2)), - latest: try!(r.val_at(3)), - genesis: try!(r.val_at(4)), - asking: PeerAsking::Nothing, - asking_blocks: Vec::new(), - }; - - trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest, peer.genesis); - - let chain_info = io.chain().chain_info(); - if peer.genesis != chain_info.genesis_hash { - io.disable_peer(peer_id); - trace!(target: "sync", "Peer {} genesis hash not matched", peer_id); - return Ok(()); - } - if peer.network_id != NETWORK_ID { - io.disable_peer(peer_id); - trace!(target: "sync", "Peer {} network id not matched", peer_id); - return Ok(()); - } - - let old = self.peers.insert(peer_id.clone(), peer); - if old.is_some() { - panic!("ChainSync: new peer already exists"); - } - info!(target: "sync", "Connected {}:{}", peer_id, io.peer_info(peer_id)); - self.sync_peer(io, peer_id, false); - Ok(()) - } - - #[allow(cyclomatic_complexity)] - /// Called by peer once it has new block headers during sync - fn on_peer_block_headers(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { - self.reset_peer_asking(peer_id, PeerAsking::BlockHeaders); - let item_count = r.item_count(); - trace!(target: "sync", "{} -> BlockHeaders ({} entries)", peer_id, item_count); - self.clear_peer_download(peer_id); - if self.state != SyncState::Blocks && self.state != SyncState::NewBlocks && self.state != SyncState::Waiting { - trace!(target: "sync", "Ignored unexpected block headers"); - return Ok(()); - } - if self.state == SyncState::Waiting { - trace!(target: "sync", "Ignored block headers while waiting"); - return Ok(()); - } - - for i in 0..item_count { - let info: BlockHeader = try!(r.val_at(i)); - let number = BlockNumber::from(info.number); - if number <= self.current_base_block() || self.headers.have_item(&number) { - trace!(target: "sync", "Skipping existing block header"); - continue; - } - - if self.highest_block == None || number > self.highest_block.unwrap() { - self.highest_block = Some(number); - } - let hash = info.hash(); - match io.chain().block_status(&hash) { - BlockStatus::InChain => { - self.have_common_block = true; - self.last_imported_block = Some(number); - self.last_imported_hash = Some(hash.clone()); - trace!(target: "sync", "Found common header {} ({})", number, hash); - }, - _ => { - if self.have_common_block { - //validate chain - let base_hash = self.last_imported_hash.clone().unwrap(); - if self.have_common_block && number == self.current_base_block() + 1 && info.parent_hash != base_hash { - // TODO: lower peer rating - debug!(target: "sync", "Mismatched block header {} {}", number, hash); - continue; - } - if self.headers.find_item(&(number - 1)).map_or(false, |p| p.hash != info.parent_hash) { - // mismatching parent id, delete the previous block and don't add this one - // TODO: lower peer rating - debug!(target: "sync", "Mismatched block header {} {}", number, hash); - self.remove_downloaded_blocks(number - 1); - continue; - } - if self.headers.find_item(&(number + 1)).map_or(false, |p| p.parent != hash) { - // mismatching parent id for the next block, clear following headers - debug!(target: "sync", "Mismatched block header {}", number + 1); - self.remove_downloaded_blocks(number + 1); - } - } - let hdr = Header { - data: try!(r.at(i)).as_raw().to_vec(), - hash: hash.clone(), - parent: info.parent_hash, - }; - self.headers.insert_item(number, hdr); - let header_id = HeaderId { - transactions_root: info.transactions_root, - uncles: info.uncles_hash - }; - trace!(target: "sync", "Got header {} ({})", number, hash); - if header_id.transactions_root == rlp::SHA3_NULL_RLP && header_id.uncles == rlp::SHA3_EMPTY_LIST_RLP { - //empty body, just mark as downloaded - let mut body_stream = RlpStream::new_list(2); - body_stream.append_raw(&rlp::NULL_RLP, 1); - body_stream.append_raw(&rlp::EMPTY_LIST_RLP, 1); - self.bodies.insert_item(number, body_stream.out()); - } - else { - self.header_ids.insert(header_id, number); - } - } - } - } - self.collect_blocks(io); - self.continue_sync(io); - Ok(()) - } - - /// Called by peer once it has new block bodies - fn on_peer_block_bodies(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { - use util::triehash::ordered_trie_root; - self.reset_peer_asking(peer_id, PeerAsking::BlockBodies); - let item_count = r.item_count(); - trace!(target: "sync", "{} -> BlockBodies ({} entries)", peer_id, item_count); - self.clear_peer_download(peer_id); - if self.state != SyncState::Blocks && self.state != SyncState::NewBlocks && self.state != SyncState::Waiting { - trace!(target: "sync", "Ignored unexpected block bodies"); - return Ok(()); - } - if self.state == SyncState::Waiting { - trace!(target: "sync", "Ignored block bodies while waiting"); - return Ok(()); - } - for i in 0..item_count { - let body = try!(r.at(i)); - let tx = try!(body.at(0)); - let tx_root = ordered_trie_root(tx.iter().map(|r| r.as_raw().to_vec()).collect()); //TODO: get rid of vectors here - let uncles = try!(body.at(1)).as_raw().sha3(); - let header_id = HeaderId { - transactions_root: tx_root, - uncles: uncles - }; - match self.header_ids.get(&header_id).cloned() { - Some(n) => { - self.header_ids.remove(&header_id); - self.bodies.insert_item(n, body.as_raw().to_vec()); - trace!(target: "sync", "Got body {}", n); - } - None => { - debug!(target: "sync", "Ignored unknown block body"); - } - } - } - self.collect_blocks(io); - self.continue_sync(io); - Ok(()) - } - - /// Called by peer once it has new block bodies - fn on_peer_new_block(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { - let block_rlp = try!(r.at(0)); - let header_rlp = try!(block_rlp.at(0)); - let h = header_rlp.as_raw().sha3(); - - trace!(target: "sync", "{} -> NewBlock ({})", peer_id, h); - let header_view = HeaderView::new(header_rlp.as_raw()); - // TODO: Decompose block and add to self.headers and self.bodies instead - if header_view.number() == From::from(self.current_base_block() + 1) { - match io.chain().import_block(block_rlp.as_raw().to_vec()) { - Err(ImportError::AlreadyInChain) => { - trace!(target: "sync", "New block already in chain {:?}", h); - }, - Err(ImportError::AlreadyQueued) => { - trace!(target: "sync", "New block already queued {:?}", h); - }, - Ok(_) => { - trace!(target: "sync", "New block queued {:?}", h); - }, - Err(e) => { - debug!(target: "sync", "Bad new block {:?} : {:?}", h, e); - io.disable_peer(peer_id); - } - }; - } - else { - trace!(target: "sync", "New block unknown {:?}", h); - //TODO: handle too many unknown blocks - let difficulty: U256 = try!(r.val_at(1)); - let peer_difficulty = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").difficulty; - if difficulty > peer_difficulty { - trace!(target: "sync", "Received block {:?} with no known parent. Peer needs syncing...", h); - { - let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); - peer.latest = header_view.sha3(); - } - self.sync_peer(io, peer_id, true); - } - } - Ok(()) - } - - /// Handles NewHashes packet. Initiates headers download for any unknown hashes. - fn on_peer_new_hashes(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { - if self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").asking != PeerAsking::Nothing { - trace!(target: "sync", "Ignoring new hashes since we're already downloading."); - return Ok(()); - } - trace!(target: "sync", "{} -> NewHashes ({} entries)", peer_id, r.item_count()); - let hashes = r.iter().map(|item| (item.val_at::(0), item.val_at::(1))); - let mut max_height: U256 = From::from(0); - for (rh, rd) in hashes { - let h = try!(rh); - let d = try!(rd); - match io.chain().block_status(&h) { - BlockStatus::InChain => { - trace!(target: "sync", "New block hash already in chain {:?}", h); - }, - BlockStatus::Queued => { - trace!(target: "sync", "New hash block already queued {:?}", h); - }, - BlockStatus::Unknown => { - trace!(target: "sync", "New unknown block hash {:?}", h); - if d > max_height { - let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); - peer.latest = h.clone(); - max_height = d; - } - }, - BlockStatus::Bad =>{ - debug!(target: "sync", "Bad new block hash {:?}", h); - io.disable_peer(peer_id); - return Ok(()); - } - } - }; - if max_height != x!(0) { - self.sync_peer(io, peer_id, true); - } - Ok(()) - } - - /// Called by peer when it is disconnecting - pub fn on_peer_aborting(&mut self, io: &mut SyncIo, peer: PeerId) { - trace!(target: "sync", "== Disconnecting {}", peer); - if self.peers.contains_key(&peer) { - info!(target: "sync", "Disconnected {}:{}", peer, io.peer_info(peer)); - self.clear_peer_download(peer); - self.peers.remove(&peer); - self.continue_sync(io); - } - } - - /// Called when a new peer is connected - pub fn on_peer_connected(&mut self, io: &mut SyncIo, peer: PeerId) { - trace!(target: "sync", "== Connected {}", peer); - self.send_status(io, peer); - } - - /// Resume downloading - fn continue_sync(&mut self, io: &mut SyncIo) { - let mut peers: Vec<(PeerId, U256)> = self.peers.iter().map(|(k, p)| (*k, p.difficulty)).collect(); - peers.sort_by(|&(_, d1), &(_, d2)| d1.cmp(&d2).reverse()); //TODO: sort by rating - for (p, _) in peers { - self.sync_peer(io, p, false); - } - } - - /// Called after all blocks have been donloaded - fn complete_sync(&mut self) { - trace!(target: "sync", "Sync complete"); - self.reset(); - self.state = SyncState::Idle; - } - - /// Enter waiting state - fn pause_sync(&mut self) { - trace!(target: "sync", "Block queue full, pausing sync"); - self.state = SyncState::Waiting; - } - - /// Find something to do for a peer. Called for a new peer or when a peer is done with it's task. - fn sync_peer(&mut self, io: &mut SyncIo, peer_id: PeerId, force: bool) { - let (peer_latest, peer_difficulty) = { - let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); - if peer.asking != PeerAsking::Nothing { - return; - } - if self.state == SyncState::Waiting { - trace!(target: "sync", "Waiting for block queue"); - return; - } - (peer.latest.clone(), peer.difficulty.clone()) - }; - - let td = io.chain().chain_info().pending_total_difficulty; - let syncing_difficulty = max(self.syncing_difficulty, td); - if force || peer_difficulty > syncing_difficulty { - // start sync - self.syncing_difficulty = peer_difficulty; - if self.state == SyncState::Idle || self.state == SyncState::NotSynced { - self.state = SyncState::Blocks; - } - trace!(target: "sync", "Starting sync with better chain"); - self.request_headers_by_hash(io, peer_id, &peer_latest, 1, 0, false); - } - else if self.state == SyncState::Blocks { - self.request_blocks(io, peer_id); - } - } - - fn current_base_block(&self) -> BlockNumber { - match self.last_imported_block { None => 0, Some(x) => x } - } - - /// Find some headers or blocks to download for a peer. - fn request_blocks(&mut self, io: &mut SyncIo, peer_id: PeerId) { - self.clear_peer_download(peer_id); - - if io.chain().queue_info().full { - self.pause_sync(); - return; - } - - // check to see if we need to download any block bodies first - let mut needed_bodies: Vec = Vec::new(); - let mut needed_numbers: Vec = Vec::new(); - - if self.have_common_block && !self.headers.is_empty() && self.headers.range_iter().next().unwrap().0 == self.current_base_block() + 1 { - for (start, ref items) in self.headers.range_iter() { - if needed_bodies.len() > MAX_BODIES_TO_REQUEST { - break; - } - let mut index: BlockNumber = 0; - while index != items.len() as BlockNumber && needed_bodies.len() < MAX_BODIES_TO_REQUEST { - let block = start + index; - if !self.downloading_bodies.contains(&block) && !self.bodies.have_item(&block) { - needed_bodies.push(items[index as usize].hash.clone()); - needed_numbers.push(block); - self.downloading_bodies.insert(block); - } - index += 1; - } - } - } - if !needed_bodies.is_empty() { - replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, needed_numbers); - self.request_bodies(io, peer_id, needed_bodies); - } - else { - // check if need to download headers - let mut start = 0usize; - if !self.have_common_block { - // download backwards until common block is found 1 header at a time - let chain_info = io.chain().chain_info(); - start = chain_info.best_block_number as usize; - if !self.headers.is_empty() { - start = min(start, self.headers.range_iter().next().unwrap().0 as usize - 1); - } - if start == 0 { - self.have_common_block = true; //reached genesis - self.last_imported_hash = Some(chain_info.genesis_hash); - } - } - if self.have_common_block { - let mut headers: Vec = Vec::new(); - let mut prev = self.current_base_block() + 1; - for (next, ref items) in self.headers.range_iter() { - if !headers.is_empty() { - break; - } - if next <= prev { - prev = next + items.len() as BlockNumber; - continue; - } - let mut block = prev; - while block < next && headers.len() <= MAX_HEADERS_TO_REQUEST { - if !self.downloading_headers.contains(&(block as BlockNumber)) { - headers.push(block as BlockNumber); - self.downloading_headers.insert(block as BlockNumber); - } - block += 1; - } - prev = next + items.len() as BlockNumber; - } - - if !headers.is_empty() { - start = headers[0] as usize; - let count = headers.len(); - replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, headers); - assert!(!self.headers.have_item(&(start as BlockNumber))); - self.request_headers_by_number(io, peer_id, start as BlockNumber, count, 0, false); - } - } - else { - self.request_headers_by_number(io, peer_id, start as BlockNumber, 1, 0, false); - } - } - } - - /// Clear all blocks/headers marked as being downloaded by a peer. - fn clear_peer_download(&mut self, peer_id: PeerId) { - let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); - for b in &peer.asking_blocks { - self.downloading_headers.remove(&b); - self.downloading_bodies.remove(&b); - } - peer.asking_blocks.clear(); - } - - /// Checks if there are blocks fully downloaded that can be imported into the blockchain and does the import. - fn collect_blocks(&mut self, io: &mut SyncIo) { - if !self.have_common_block || self.headers.is_empty() || self.bodies.is_empty() { - return; - } - - let mut restart = false; - // merge headers and bodies - { - let headers = self.headers.range_iter().next().unwrap(); - let bodies = self.bodies.range_iter().next().unwrap(); - if headers.0 != bodies.0 || headers.0 != self.current_base_block() + 1 { - return; - } - - let count = min(headers.1.len(), bodies.1.len()); - let mut imported = 0; - for i in 0..count { - let mut block_rlp = RlpStream::new_list(3); - block_rlp.append_raw(&headers.1[i].data, 1); - let body = Rlp::new(&bodies.1[i]); - block_rlp.append_raw(body.at(0).as_raw(), 1); - block_rlp.append_raw(body.at(1).as_raw(), 1); - let h = &headers.1[i].hash; - match io.chain().import_block(block_rlp.out()) { - Err(ImportError::AlreadyInChain) => { - trace!(target: "sync", "Block already in chain {:?}", h); - self.last_imported_block = Some(headers.0 + i as BlockNumber); - self.last_imported_hash = Some(h.clone()); - }, - Err(ImportError::AlreadyQueued) => { - trace!(target: "sync", "Block already queued {:?}", h); - self.last_imported_block = Some(headers.0 + i as BlockNumber); - self.last_imported_hash = Some(h.clone()); - }, - Ok(_) => { - trace!(target: "sync", "Block queued {:?}", h); - self.last_imported_block = Some(headers.0 + i as BlockNumber); - self.last_imported_hash = Some(h.clone()); - imported += 1; - }, - Err(e) => { - debug!(target: "sync", "Bad block {:?} : {:?}", h, e); - restart = true; - } - } - } - trace!(target: "sync", "Imported {} of {}", imported, count); - } - - if restart { - self.restart(io); - return; - } - - self.headers.remove_head(&(self.last_imported_block.unwrap() + 1)); - self.bodies.remove_head(&(self.last_imported_block.unwrap() + 1)); - - if self.headers.is_empty() { - assert!(self.bodies.is_empty()); - self.complete_sync(); - } - } - - /// Remove downloaded bocks/headers starting from specified number. - /// Used to recover from an error and re-download parts of the chain detected as bad. - fn remove_downloaded_blocks(&mut self, start: BlockNumber) { - for n in self.headers.get_tail(&start) { - if let Some(ref header_data) = self.headers.find_item(&n) { - let header_to_delete = HeaderView::new(&header_data.data); - let header_id = HeaderId { - transactions_root: header_to_delete.transactions_root(), - uncles: header_to_delete.uncles_hash() - }; - self.header_ids.remove(&header_id); - } - self.downloading_bodies.remove(&n); - self.downloading_headers.remove(&n); - } - self.headers.remove_tail(&start); - self.bodies.remove_tail(&start); - } - - /// Request headers from a peer by block hash - fn request_headers_by_hash(&mut self, sync: &mut SyncIo, peer_id: PeerId, h: &H256, count: usize, skip: usize, reverse: bool) { - trace!(target: "sync", "{} <- GetBlockHeaders: {} entries starting from {}", peer_id, count, h); - let mut rlp = RlpStream::new_list(4); - rlp.append(h); - rlp.append(&count); - rlp.append(&skip); - rlp.append(&if reverse {1u32} else {0u32}); - self.send_request(sync, peer_id, PeerAsking::BlockHeaders, GET_BLOCK_HEADERS_PACKET, rlp.out()); - } - - /// Request headers from a peer by block number - fn request_headers_by_number(&mut self, sync: &mut SyncIo, peer_id: PeerId, n: BlockNumber, count: usize, skip: usize, reverse: bool) { - let mut rlp = RlpStream::new_list(4); - trace!(target: "sync", "{} <- GetBlockHeaders: {} entries starting from {}", peer_id, count, n); - rlp.append(&n); - rlp.append(&count); - rlp.append(&skip); - rlp.append(&if reverse {1u32} else {0u32}); - self.send_request(sync, peer_id, PeerAsking::BlockHeaders, GET_BLOCK_HEADERS_PACKET, rlp.out()); - } - - /// Request block bodies from a peer - fn request_bodies(&mut self, sync: &mut SyncIo, peer_id: PeerId, hashes: Vec) { - let mut rlp = RlpStream::new_list(hashes.len()); - trace!(target: "sync", "{} <- GetBlockBodies: {} entries", peer_id, hashes.len()); - for h in hashes { - rlp.append(&h); - } - self.send_request(sync, peer_id, PeerAsking::BlockBodies, GET_BLOCK_BODIES_PACKET, rlp.out()); - } - - /// Reset peer status after request is complete. - fn reset_peer_asking(&mut self, peer_id: PeerId, asking: PeerAsking) { - let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); - if peer.asking != asking { - warn!(target:"sync", "Asking {:?} while expected {:?}", peer.asking, asking); - } - else { - peer.asking = PeerAsking::Nothing; - } - } - - /// Generic request sender - fn send_request(&mut self, sync: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, packet_id: PacketId, packet: Bytes) { - { - let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); - if peer.asking != PeerAsking::Nothing { - warn!(target:"sync", "Asking {:?} while requesting {:?}", asking, peer.asking); - } - } - match sync.send(peer_id, packet_id, packet) { - Err(e) => { - warn!(target:"sync", "Error sending request: {:?}", e); - sync.disable_peer(peer_id); - self.on_peer_aborting(sync, peer_id); - } - Ok(_) => { - let mut peer = self.peers.get_mut(&peer_id).unwrap(); - peer.asking = asking; - } - } - } - - /// Called when peer sends us new transactions - fn on_peer_transactions(&mut self, _io: &mut SyncIo, _peer_id: PeerId, _r: &UntrustedRlp) -> Result<(), PacketDecodeError> { - Ok(()) - } - - /// Send Status message - fn send_status(&mut self, io: &mut SyncIo, peer_id: PeerId) { - let mut packet = RlpStream::new_list(5); - let chain = io.chain().chain_info(); - packet.append(&(PROTOCOL_VERSION as u32)); - packet.append(&NETWORK_ID); //TODO: network id - packet.append(&chain.total_difficulty); - packet.append(&chain.best_block_hash); - packet.append(&chain.genesis_hash); - //TODO: handle timeout for status request - if let Err(e) = io.send(peer_id, STATUS_PACKET, packet.out()) { - warn!(target:"sync", "Error sending status request: {:?}", e); - io.disable_peer(peer_id); - } - } - - /// Respond to GetBlockHeaders request - fn return_block_headers(&self, io: &mut SyncIo, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { - // Packet layout: - // [ block: { P , B_32 }, maxHeaders: P, skip: P, reverse: P in { 0 , 1 } ] - let max_headers: usize = try!(r.val_at(1)); - let skip: usize = try!(r.val_at(2)); - let reverse: bool = try!(r.val_at(3)); - let last = io.chain().chain_info().best_block_number; - let mut number = if try!(r.at(0)).size() == 32 { - // id is a hash - let hash: H256 = try!(r.val_at(0)); - trace!(target: "sync", "-> GetBlockHeaders (hash: {}, max: {}, skip: {}, reverse:{})", hash, max_headers, skip, reverse); - match io.chain().block_header(&hash) { - Some(hdr) => From::from(HeaderView::new(&hdr).number()), - None => last - } - } - else { - trace!(target: "sync", "-> GetBlockHeaders (number: {}, max: {}, skip: {}, reverse:{})", try!(r.val_at::(0)), max_headers, skip, reverse); - try!(r.val_at(0)) - }; - - if reverse { - number = min(last, number); - } else { - number = max(1, number); - } - let max_count = min(MAX_HEADERS_TO_SEND, max_headers); - let mut count = 0; - let mut data = Bytes::new(); - let inc = (skip + 1) as BlockNumber; - while number <= last && number > 0 && count < max_count { - if let Some(mut hdr) = io.chain().block_header_at(number) { - data.append(&mut hdr); - count += 1; - } - if reverse { - if number <= inc { - break; - } - number -= inc; - } - else { - number += inc; - } - } - let mut rlp = RlpStream::new_list(count as usize); - rlp.append_raw(&data, count as usize); - io.respond(BLOCK_HEADERS_PACKET, rlp.out()).unwrap_or_else(|e| - debug!(target: "sync", "Error sending headers: {:?}", e)); - trace!(target: "sync", "-> GetBlockHeaders: returned {} entries", count); - Ok(()) - } - - /// Respond to GetBlockBodies request - fn return_block_bodies(&self, io: &mut SyncIo, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { - let mut count = r.item_count(); - if count == 0 { - debug!(target: "sync", "Empty GetBlockBodies request, ignoring."); - return Ok(()); - } - trace!(target: "sync", "-> GetBlockBodies: {} entries", count); - count = min(count, MAX_BODIES_TO_SEND); - let mut added = 0usize; - let mut data = Bytes::new(); - for i in 0..count { - if let Some(mut hdr) = io.chain().block_body(&try!(r.val_at::(i))) { - data.append(&mut hdr); - added += 1; - } - } - let mut rlp = RlpStream::new_list(added); - rlp.append_raw(&data, added); - io.respond(BLOCK_BODIES_PACKET, rlp.out()).unwrap_or_else(|e| - debug!(target: "sync", "Error sending headers: {:?}", e)); - trace!(target: "sync", "-> GetBlockBodies: returned {} entries", added); - Ok(()) - } - - /// Respond to GetNodeData request - fn return_node_data(&self, io: &mut SyncIo, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { - let mut count = r.item_count(); - if count == 0 { - debug!(target: "sync", "Empty GetNodeData request, ignoring."); - return Ok(()); - } - count = min(count, MAX_NODE_DATA_TO_SEND); - let mut added = 0usize; - let mut data = Bytes::new(); - for i in 0..count { - if let Some(mut hdr) = io.chain().state_data(&try!(r.val_at::(i))) { - data.append(&mut hdr); - added += 1; - } - } - let mut rlp = RlpStream::new_list(added); - rlp.append_raw(&data, added); - io.respond(NODE_DATA_PACKET, rlp.out()).unwrap_or_else(|e| - debug!(target: "sync", "Error sending headers: {:?}", e)); - Ok(()) - } - - /// Respond to GetReceipts request - fn return_receipts(&self, io: &mut SyncIo, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { - let mut count = r.item_count(); - if count == 0 { - debug!(target: "sync", "Empty GetReceipts request, ignoring."); - return Ok(()); - } - count = min(count, MAX_RECEIPTS_TO_SEND); - let mut added = 0usize; - let mut data = Bytes::new(); - for i in 0..count { - if let Some(mut hdr) = io.chain().block_receipts(&try!(r.val_at::(i))) { - data.append(&mut hdr); - added += 1; - } - } - let mut rlp = RlpStream::new_list(added); - rlp.append_raw(&data, added); - io.respond(RECEIPTS_PACKET, rlp.out()).unwrap_or_else(|e| - debug!(target: "sync", "Error sending headers: {:?}", e)); - Ok(()) - } - - /// Dispatch incoming requests and responses - pub fn on_packet(&mut self, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) { - let rlp = UntrustedRlp::new(data); - let result = match packet_id { - STATUS_PACKET => self.on_peer_status(io, peer, &rlp), - TRANSACTIONS_PACKET => self.on_peer_transactions(io, peer, &rlp), - GET_BLOCK_HEADERS_PACKET => self.return_block_headers(io, &rlp), - BLOCK_HEADERS_PACKET => self.on_peer_block_headers(io, peer, &rlp), - GET_BLOCK_BODIES_PACKET => self.return_block_bodies(io, &rlp), - BLOCK_BODIES_PACKET => self.on_peer_block_bodies(io, peer, &rlp), - NEW_BLOCK_PACKET => self.on_peer_new_block(io, peer, &rlp), - NEW_BLOCK_HASHES_PACKET => self.on_peer_new_hashes(io, peer, &rlp), - GET_NODE_DATA_PACKET => self.return_node_data(io, &rlp), - GET_RECEIPTS_PACKET => self.return_receipts(io, &rlp), - _ => { - debug!(target: "sync", "Unknown packet {}", packet_id); - Ok(()) - } - }; - result.unwrap_or_else(|e| { - debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e); - }) - } - - /// Maintain other peers. Send out any new blocks and transactions - pub fn _maintain_sync(&mut self, _io: &mut SyncIo) { - } -} - diff --git a/sync/io.rs b/sync/io.rs deleted file mode 100644 index 501d36ad2..000000000 --- a/sync/io.rs +++ /dev/null @@ -1,62 +0,0 @@ -use client::BlockChainClient; -use util::{NetworkContext, PeerId, PacketId,}; -use util::error::UtilError; -use service::SyncMessage; - -/// IO interface for the syning handler. -/// Provides peer connection management and an interface to the blockchain client. -// TODO: ratings -pub trait SyncIo { - /// Disable a peer - fn disable_peer(&mut self, peer_id: PeerId); - /// Respond to current request with a packet. Can be called from an IO handler for incoming packet. - fn respond(&mut self, packet_id: PacketId, data: Vec) -> Result<(), UtilError>; - /// Send a packet to a peer. - fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), UtilError>; - /// Get the blockchain - fn chain(&self) -> &BlockChainClient; - /// Returns peer client identifier string - fn peer_info(&self, peer_id: PeerId) -> String { - peer_id.to_string() - } -} - -/// Wraps `NetworkContext` and the blockchain client -pub struct NetSyncIo<'s, 'h> where 'h: 's { - network: &'s NetworkContext<'h, SyncMessage>, - chain: &'s BlockChainClient -} - -impl<'s, 'h> NetSyncIo<'s, 'h> { - /// Creates a new instance from the `NetworkContext` and the blockchain client reference. - pub fn new(network: &'s NetworkContext<'h, SyncMessage>, chain: &'s BlockChainClient) -> NetSyncIo<'s, 'h> { - NetSyncIo { - network: network, - chain: chain, - } - } -} - -impl<'s, 'h> SyncIo for NetSyncIo<'s, 'h> { - fn disable_peer(&mut self, peer_id: PeerId) { - self.network.disable_peer(peer_id); - } - - fn respond(&mut self, packet_id: PacketId, data: Vec) -> Result<(), UtilError>{ - self.network.respond(packet_id, data) - } - - fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), UtilError>{ - self.network.send(peer_id, packet_id, data) - } - - fn chain(&self) -> &BlockChainClient { - self.chain - } - - fn peer_info(&self, peer_id: PeerId) -> String { - self.network.peer_info(peer_id) - } -} - - diff --git a/sync/mod.rs b/sync/mod.rs deleted file mode 100644 index 34a1e429d..000000000 --- a/sync/mod.rs +++ /dev/null @@ -1,94 +0,0 @@ -/// Blockchain sync module -/// Implements ethereum protocol version 63 as specified here: -/// https://github.com/ethereum/wiki/wiki/Ethereum-Wire-Protocol -/// -/// Usage example: -/// -/// ```rust -/// extern crate ethcore_util as util; -/// extern crate ethcore; -/// use std::env; -/// use std::sync::Arc; -/// use util::network::{NetworkService, NetworkConfiguration}; -/// use ethcore::client::Client; -/// use ethcore::sync::EthSync; -/// use ethcore::ethereum; -/// -/// fn main() { -/// let mut service = NetworkService::start(NetworkConfiguration::new()).unwrap(); -/// let dir = env::temp_dir(); -/// let client = Client::new(ethereum::new_frontier(), &dir, service.io().channel()).unwrap(); -/// EthSync::register(&mut service, client); -/// } -/// ``` - -use std::ops::*; -use std::sync::*; -use client::Client; -use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId}; -use sync::chain::ChainSync; -use service::SyncMessage; -use sync::io::NetSyncIo; - -mod chain; -mod io; -mod range_collection; - -#[cfg(test)] -mod tests; - -/// Ethereum network protocol handler -pub struct EthSync { - /// Shared blockchain client. TODO: this should evetually become an IPC endpoint - chain: Arc, - /// Sync strategy - sync: RwLock -} - -pub use self::chain::SyncStatus; - -impl EthSync { - /// Creates and register protocol with the network service - pub fn register(service: &mut NetworkService, chain: Arc) -> Arc { - let sync = Arc::new(EthSync { - chain: chain, - sync: RwLock::new(ChainSync::new()), - }); - service.register_protocol(sync.clone(), "eth", &[62u8, 63u8]).expect("Error registering eth protocol handler"); - sync - } - - /// Get sync status - pub fn status(&self) -> SyncStatus { - self.sync.read().unwrap().status() - } - - /// Stop sync - pub fn stop(&mut self, io: &mut NetworkContext) { - self.sync.write().unwrap().abort(&mut NetSyncIo::new(io, self.chain.deref())); - } - - /// Restart sync - pub fn restart(&mut self, io: &mut NetworkContext) { - self.sync.write().unwrap().restart(&mut NetSyncIo::new(io, self.chain.deref())); - } -} - -impl NetworkProtocolHandler for EthSync { - fn initialize(&self, _io: &NetworkContext) { - } - - fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { - self.sync.write().unwrap().on_packet(&mut NetSyncIo::new(io, self.chain.deref()) , *peer, packet_id, data); - } - - fn connected(&self, io: &NetworkContext, peer: &PeerId) { - self.sync.write().unwrap().on_peer_connected(&mut NetSyncIo::new(io, self.chain.deref()), *peer); - } - - fn disconnected(&self, io: &NetworkContext, peer: &PeerId) { - self.sync.write().unwrap().on_peer_aborting(&mut NetSyncIo::new(io, self.chain.deref()), *peer); - } -} - - diff --git a/sync/range_collection.rs b/sync/range_collection.rs deleted file mode 100644 index b8186e5a5..000000000 --- a/sync/range_collection.rs +++ /dev/null @@ -1,260 +0,0 @@ -/// This module defines a trait for a collection of ranged values and an implementation -/// for this trait over sorted vector. - -use std::ops::{Add, Sub, Range}; - -pub trait ToUsize { - fn to_usize(&self) -> usize; -} - -pub trait FromUsize { - fn from_usize(s: usize) -> Self; -} - -/// A key-value collection orderd by key with sequential key-value pairs grouped together. -/// Such group is called a range. -/// E.g. a set of collection of 5 pairs {1, a}, {2, b}, {10, x}, {11, y}, {12, z} will be grouped into two ranges: {1, [a,b]}, {10, [x,y,z]} -pub trait RangeCollection { - /// Check if the given key is present in the collection. - fn have_item(&self, key: &K) -> bool; - /// Get value by key. - fn find_item(&self, key: &K) -> Option<&V>; - /// Get a range of keys from `key` till the end of the range that has `key` - /// Returns an empty range is key does not exist. - fn get_tail(&mut self, key: &K) -> Range; - /// Remove all elements < `start` in the range that contains `start` - 1 - fn remove_head(&mut self, start: &K); - /// Remove all elements >= `start` in the range that contains `start` - fn remove_tail(&mut self, start: &K); - /// Remove all elements >= `tail` - fn insert_item(&mut self, key: K, value: V); - /// Get an iterator over ranges - fn range_iter(& self) -> RangeIterator; -} - -/// Range iterator. For each range yelds a key for the first element of the range and a vector of values. -pub struct RangeIterator<'c, K:'c, V:'c> { - range: usize, - collection: &'c Vec<(K, Vec)> -} - -impl<'c, K:'c, V:'c> Iterator for RangeIterator<'c, K, V> where K: Add + FromUsize + ToUsize + Copy { - type Item = (K, &'c [V]); - // The 'Iterator' trait only requires the 'next' method to be defined. The - // return type is 'Option', 'None' is returned when the 'Iterator' is - // over, otherwise the next value is returned wrapped in 'Some' - fn next(&mut self) -> Option<(K, &'c [V])> { - if self.range > 0 { - self.range -= 1; - } - else { - return None; - } - match self.collection.get(self.range) { - Some(&(ref k, ref vec)) => { - Some((*k, &vec)) - }, - None => None - } - } -} - -impl RangeCollection for Vec<(K, Vec)> where K: Ord + PartialEq + Add + Sub + Copy + FromUsize + ToUsize { - fn range_iter(&self) -> RangeIterator { - RangeIterator { - range: self.len(), - collection: self - } - } - - fn have_item(&self, key: &K) -> bool { - match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { - Ok(_) => true, - Err(index) => match self.get(index) { - Some(&(ref k, ref v)) => k <= key && (*k + FromUsize::from_usize(v.len())) > *key, - _ => false - }, - } - } - - fn find_item(&self, key: &K) -> Option<&V> { - match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { - Ok(index) => self.get(index).unwrap().1.get(0), - Err(index) => match self.get(index) { - Some(&(ref k, ref v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => v.get((*key - *k).to_usize()), - _ => None - }, - } - } - - fn get_tail(&mut self, key: &K) -> Range { - let kv = *key; - match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { - Ok(index) => kv..(kv + FromUsize::from_usize(self[index].1.len())), - Err(index) => { - match self.get_mut(index) { - Some(&mut (ref k, ref mut v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => { - kv..(*k + FromUsize::from_usize(v.len())) - } - _ => kv..kv - } - }, - } - } - /// Remove element key and following elements in the same range - fn remove_tail(&mut self, key: &K) { - match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { - Ok(index) => { self.remove(index); }, - Err(index) =>{ - let mut empty = false; - match self.get_mut(index) { - Some(&mut (ref k, ref mut v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => { - v.truncate((*key - *k).to_usize()); - empty = v.is_empty(); - } - _ => {} - } - if empty { - self.remove(index); - } - }, - } - } - - /// Remove range elements up to key - fn remove_head(&mut self, key: &K) { - if *key == FromUsize::from_usize(0) { - return - } - - let prev = *key - FromUsize::from_usize(1); - match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { - Ok(_) => { }, //start of range, do nothing. - Err(index) => { - let mut empty = false; - match self.get_mut(index) { - Some(&mut (ref mut k, ref mut v)) if *k <= prev && (*k + FromUsize::from_usize(v.len())) > prev => { - let tail = v.split_off((*key - *k).to_usize()); - empty = tail.is_empty(); - let removed = ::std::mem::replace(v, tail); - let new_k = *k + FromUsize::from_usize(removed.len()); - ::std::mem::replace(k, new_k); - } - _ => {} - } - if empty { - self.remove(index); - } - }, - } - } - - fn insert_item(&mut self, key: K, value: V) { - assert!(!self.have_item(&key)); - - let lower = match self.binary_search_by(|&(k, _)| k.cmp(&key).reverse()) { - Ok(index) => index, - Err(index) => index, - }; - - let mut to_remove: Option = None; - if lower < self.len() && self[lower].0 + FromUsize::from_usize(self[lower].1.len()) == key { - // extend into existing chunk - self[lower].1.push(value); - } - else { - // insert a new chunk - let range: Vec = vec![value]; - self.insert(lower, (key, range)); - }; - if lower > 0 { - let next = lower - 1; - if next < self.len() - { - { - let (mut next, mut inserted) = self.split_at_mut(lower); - let mut next = next.last_mut().unwrap(); - let mut inserted = inserted.first_mut().unwrap(); - if next.0 == key + FromUsize::from_usize(1) - { - inserted.1.append(&mut next.1); - to_remove = Some(lower - 1); - } - } - - if let Some(r) = to_remove { - self.remove(r); - } - } - } - } -} - -#[test] -#[allow(cyclomatic_complexity)] -fn test_range() { - use std::cmp::{Ordering}; - - let mut ranges: Vec<(u64, Vec)> = Vec::new(); - assert_eq!(ranges.range_iter().next(), None); - assert_eq!(ranges.find_item(&1), None); - assert!(!ranges.have_item(&1)); - assert_eq!(ranges.get_tail(&0), 0..0); - - ranges.insert_item(17, 'q'); - assert_eq!(ranges.range_iter().cmp(vec![(17, &['q'][..])]), Ordering::Equal); - assert_eq!(ranges.find_item(&17), Some(&'q')); - assert!(ranges.have_item(&17)); - assert_eq!(ranges.get_tail(&17), 17..18); - - ranges.insert_item(18, 'r'); - assert_eq!(ranges.range_iter().cmp(vec![(17, &['q', 'r'][..])]), Ordering::Equal); - assert_eq!(ranges.find_item(&18), Some(&'r')); - assert!(ranges.have_item(&18)); - assert_eq!(ranges.get_tail(&17), 17..19); - - ranges.insert_item(16, 'p'); - assert_eq!(ranges.range_iter().cmp(vec![(16, &['p', 'q', 'r'][..])]), Ordering::Equal); - assert_eq!(ranges.find_item(&16), Some(&'p')); - assert_eq!(ranges.find_item(&17), Some(&'q')); - assert_eq!(ranges.find_item(&18), Some(&'r')); - assert!(ranges.have_item(&16)); - assert_eq!(ranges.get_tail(&17), 17..19); - assert_eq!(ranges.get_tail(&16), 16..19); - - ranges.insert_item(2, 'b'); - assert_eq!(ranges.range_iter().cmp(vec![(2, &['b'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); - assert_eq!(ranges.find_item(&2), Some(&'b')); - - ranges.insert_item(3, 'c'); - ranges.insert_item(4, 'd'); - assert_eq!(ranges.get_tail(&3), 3..5); - assert_eq!(ranges.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); - - let mut r = ranges.clone(); - r.remove_head(&1); - assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); - r.remove_head(&2); - assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); - r.remove_head(&3); - assert_eq!(r.range_iter().cmp(vec![(3, &['c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); - r.remove_head(&10); - assert_eq!(r.range_iter().cmp(vec![(3, &['c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); - r.remove_head(&5); - assert_eq!(r.range_iter().cmp(vec![(16, &['p', 'q', 'r'][..])]), Ordering::Equal); - r.remove_head(&19); - assert_eq!(r.range_iter().next(), None); - - let mut r = ranges.clone(); - r.remove_tail(&20); - assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); - r.remove_tail(&17); - assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p'][..])]), Ordering::Equal); - r.remove_tail(&16); - assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..])]), Ordering::Equal); - r.remove_tail(&3); - assert_eq!(r.range_iter().cmp(vec![(2, &['b'][..])]), Ordering::Equal); - r.remove_tail(&2); - assert_eq!(r.range_iter().next(), None); -} - diff --git a/sync/tests.rs b/sync/tests.rs deleted file mode 100644 index eb09b467a..000000000 --- a/sync/tests.rs +++ /dev/null @@ -1,406 +0,0 @@ -use util::*; -use client::{BlockChainClient, BlockStatus, TreeRoute, BlockChainInfo}; -use block_queue::BlockQueueInfo; -use header::{Header as BlockHeader, BlockNumber}; -use error::*; -use sync::io::SyncIo; -use sync::chain::{ChainSync, SyncState}; - -struct TestBlockChainClient { - blocks: RwLock>, - numbers: RwLock>, - genesis_hash: H256, - last_hash: RwLock, - difficulty: RwLock, -} - -impl TestBlockChainClient { - fn new() -> TestBlockChainClient { - - let mut client = TestBlockChainClient { - blocks: RwLock::new(HashMap::new()), - numbers: RwLock::new(HashMap::new()), - genesis_hash: H256::new(), - last_hash: RwLock::new(H256::new()), - difficulty: RwLock::new(From::from(0)), - }; - client.add_blocks(1, true); // add genesis block - client.genesis_hash = client.last_hash.read().unwrap().clone(); - client - } - - pub fn add_blocks(&mut self, count: usize, empty: bool) { - let len = self.numbers.read().unwrap().len(); - for n in len..(len + count) { - let mut header = BlockHeader::new(); - header.difficulty = From::from(n); - header.parent_hash = self.last_hash.read().unwrap().clone(); - header.number = n as BlockNumber; - let mut uncles = RlpStream::new_list(if empty {0} else {1}); - if !empty { - uncles.append(&H256::from(&U256::from(n))); - header.uncles_hash = uncles.as_raw().sha3(); - } - let mut rlp = RlpStream::new_list(3); - rlp.append(&header); - rlp.append_raw(&rlp::NULL_RLP, 1); - rlp.append_raw(uncles.as_raw(), 1); - self.import_block(rlp.as_raw().to_vec()).unwrap(); - } - } -} - -impl BlockChainClient for TestBlockChainClient { - fn block_total_difficulty(&self, _h: &H256) -> Option { - unimplemented!(); - } - - fn block_header(&self, h: &H256) -> Option { - self.blocks.read().unwrap().get(h).map(|r| Rlp::new(r).at(0).as_raw().to_vec()) - - } - - fn block_body(&self, h: &H256) -> Option { - self.blocks.read().unwrap().get(h).map(|r| { - let mut stream = RlpStream::new_list(2); - stream.append_raw(Rlp::new(&r).at(1).as_raw(), 1); - stream.append_raw(Rlp::new(&r).at(2).as_raw(), 1); - stream.out() - }) - } - - fn block(&self, h: &H256) -> Option { - self.blocks.read().unwrap().get(h).cloned() - } - - fn block_status(&self, h: &H256) -> BlockStatus { - match self.blocks.read().unwrap().get(h) { - Some(_) => BlockStatus::InChain, - None => BlockStatus::Unknown - } - } - - fn block_total_difficulty_at(&self, _number: BlockNumber) -> Option { - unimplemented!(); - } - - fn block_header_at(&self, n: BlockNumber) -> Option { - self.numbers.read().unwrap().get(&(n as usize)).and_then(|h| self.block_header(h)) - } - - fn block_body_at(&self, n: BlockNumber) -> Option { - self.numbers.read().unwrap().get(&(n as usize)).and_then(|h| self.block_body(h)) - } - - fn block_at(&self, n: BlockNumber) -> Option { - self.numbers.read().unwrap().get(&(n as usize)).map(|h| self.blocks.read().unwrap().get(h).unwrap().clone()) - } - - fn block_status_at(&self, n: BlockNumber) -> BlockStatus { - if (n as usize) < self.blocks.read().unwrap().len() { - BlockStatus::InChain - } else { - BlockStatus::Unknown - } - } - - fn tree_route(&self, _from: &H256, _to: &H256) -> Option { - Some(TreeRoute { - blocks: Vec::new(), - ancestor: H256::new(), - index: 0 - }) - } - - fn state_data(&self, _h: &H256) -> Option { - None - } - - fn block_receipts(&self, _h: &H256) -> Option { - None - } - - fn import_block(&self, b: Bytes) -> ImportResult { - let header = Rlp::new(&b).val_at::(0); - let h = header.hash(); - let number: usize = header.number as usize; - if number > self.blocks.read().unwrap().len() { - panic!("Unexpected block number. Expected {}, got {}", self.blocks.read().unwrap().len(), number); - } - if number > 0 { - match self.blocks.read().unwrap().get(&header.parent_hash) { - Some(parent) => { - let parent = Rlp::new(parent).val_at::(0); - if parent.number != (header.number - 1) { - panic!("Unexpected block parent"); - } - }, - None => { - panic!("Unknown block parent {:?} for block {}", header.parent_hash, number); - } - } - } - let len = self.numbers.read().unwrap().len(); - if number == len { - *self.difficulty.write().unwrap().deref_mut() += header.difficulty; - mem::replace(self.last_hash.write().unwrap().deref_mut(), h.clone()); - self.blocks.write().unwrap().insert(h.clone(), b); - self.numbers.write().unwrap().insert(number, h.clone()); - let mut parent_hash = header.parent_hash; - if number > 0 { - let mut n = number - 1; - while n > 0 && self.numbers.read().unwrap()[&n] != parent_hash { - *self.numbers.write().unwrap().get_mut(&n).unwrap() = parent_hash.clone(); - n -= 1; - parent_hash = Rlp::new(&self.blocks.read().unwrap()[&parent_hash]).val_at::(0).parent_hash; - } - } - } - else { - self.blocks.write().unwrap().insert(h.clone(), b.to_vec()); - } - Ok(h) - } - - fn queue_info(&self) -> BlockQueueInfo { - BlockQueueInfo { - full: false, - verified_queue_size: 0, - unverified_queue_size: 0, - verifying_queue_size: 0, - } - } - - fn clear_queue(&self) { - } - - fn chain_info(&self) -> BlockChainInfo { - BlockChainInfo { - total_difficulty: *self.difficulty.read().unwrap(), - pending_total_difficulty: *self.difficulty.read().unwrap(), - genesis_hash: self.genesis_hash.clone(), - best_block_hash: self.last_hash.read().unwrap().clone(), - best_block_number: self.blocks.read().unwrap().len() as BlockNumber - 1, - } - } -} - -struct TestIo<'p> { - chain: &'p mut TestBlockChainClient, - queue: &'p mut VecDeque, - sender: Option, -} - -impl<'p> TestIo<'p> { - fn new(chain: &'p mut TestBlockChainClient, queue: &'p mut VecDeque, sender: Option) -> TestIo<'p> { - TestIo { - chain: chain, - queue: queue, - sender: sender - } - } -} - -impl<'p> SyncIo for TestIo<'p> { - fn disable_peer(&mut self, _peer_id: PeerId) { - } - - fn respond(&mut self, packet_id: PacketId, data: Vec) -> Result<(), UtilError> { - self.queue.push_back(TestPacket { - data: data, - packet_id: packet_id, - recipient: self.sender.unwrap() - }); - Ok(()) - } - - fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), UtilError> { - self.queue.push_back(TestPacket { - data: data, - packet_id: packet_id, - recipient: peer_id, - }); - Ok(()) - } - - fn chain(&self) -> &BlockChainClient { - self.chain - } -} - -struct TestPacket { - data: Bytes, - packet_id: PacketId, - recipient: PeerId, -} - -struct TestPeer { - chain: TestBlockChainClient, - sync: ChainSync, - queue: VecDeque, -} - -struct TestNet { - peers: Vec, - started: bool -} - -impl TestNet { - pub fn new(n: usize) -> TestNet { - let mut net = TestNet { - peers: Vec::new(), - started: false - }; - for _ in 0..n { - net.peers.push(TestPeer { - chain: TestBlockChainClient::new(), - sync: ChainSync::new(), - queue: VecDeque::new(), - }); - } - net - } - - pub fn peer(&self, i: usize) -> &TestPeer { - self.peers.get(i).unwrap() - } - - pub fn peer_mut(&mut self, i: usize) -> &mut TestPeer { - self.peers.get_mut(i).unwrap() - } - - pub fn start(&mut self) { - for peer in 0..self.peers.len() { - for client in 0..self.peers.len() { - if peer != client { - let mut p = self.peers.get_mut(peer).unwrap(); - p.sync.on_peer_connected(&mut TestIo::new(&mut p.chain, &mut p.queue, Some(client as PeerId)), client as PeerId); - } - } - } - } - - pub fn sync_step(&mut self) { - for peer in 0..self.peers.len() { - if let Some(packet) = self.peers[peer].queue.pop_front() { - let mut p = self.peers.get_mut(packet.recipient).unwrap(); - trace!("--- {} -> {} ---", peer, packet.recipient); - p.sync.on_packet(&mut TestIo::new(&mut p.chain, &mut p.queue, Some(peer as PeerId)), peer as PeerId, packet.packet_id, &packet.data); - trace!("----------------"); - } - let mut p = self.peers.get_mut(peer).unwrap(); - p.sync._maintain_sync(&mut TestIo::new(&mut p.chain, &mut p.queue, None)); - } - } - - pub fn restart_peer(&mut self, i: usize) { - let peer = self.peer_mut(i); - peer.sync.restart(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None)); - } - - pub fn sync(&mut self) -> u32 { - self.start(); - let mut total_steps = 0; - while !self.done() { - self.sync_step(); - total_steps = total_steps + 1; - } - total_steps - } - - pub fn sync_steps(&mut self, count: usize) { - if !self.started { - self.start(); - self.started = true; - } - for _ in 0..count { - self.sync_step(); - } - } - - pub fn done(&self) -> bool { - self.peers.iter().all(|p| p.queue.is_empty()) - } -} - -#[test] -fn chain_two_peers() { - let mut net = TestNet::new(3); - net.peer_mut(1).chain.add_blocks(1000, false); - net.peer_mut(2).chain.add_blocks(1000, false); - net.sync(); - assert!(net.peer(0).chain.block_at(1000).is_some()); - assert_eq!(net.peer(0).chain.blocks.read().unwrap().deref(), net.peer(1).chain.blocks.read().unwrap().deref()); -} - -#[test] -fn chain_status_after_sync() { - let mut net = TestNet::new(3); - net.peer_mut(1).chain.add_blocks(1000, false); - net.peer_mut(2).chain.add_blocks(1000, false); - net.sync(); - let status = net.peer(0).sync.status(); - assert_eq!(status.state, SyncState::Idle); -} - -#[test] -fn chain_takes_few_steps() { - let mut net = TestNet::new(3); - net.peer_mut(1).chain.add_blocks(100, false); - net.peer_mut(2).chain.add_blocks(100, false); - let total_steps = net.sync(); - assert!(total_steps < 7); -} - -#[test] -fn chain_empty_blocks() { - let mut net = TestNet::new(3); - for n in 0..200 { - net.peer_mut(1).chain.add_blocks(5, n % 2 == 0); - net.peer_mut(2).chain.add_blocks(5, n % 2 == 0); - } - net.sync(); - assert!(net.peer(0).chain.block_at(1000).is_some()); - assert_eq!(net.peer(0).chain.blocks.read().unwrap().deref(), net.peer(1).chain.blocks.read().unwrap().deref()); -} - -#[test] -fn chain_forked() { - let mut net = TestNet::new(3); - net.peer_mut(0).chain.add_blocks(300, false); - net.peer_mut(1).chain.add_blocks(300, false); - net.peer_mut(2).chain.add_blocks(300, false); - net.peer_mut(0).chain.add_blocks(100, true); //fork - net.peer_mut(1).chain.add_blocks(200, false); - net.peer_mut(2).chain.add_blocks(200, false); - net.peer_mut(1).chain.add_blocks(100, false); //fork between 1 and 2 - net.peer_mut(2).chain.add_blocks(10, true); - // peer 1 has the best chain of 601 blocks - let peer1_chain = net.peer(1).chain.numbers.read().unwrap().clone(); - net.sync(); - assert_eq!(net.peer(0).chain.numbers.read().unwrap().deref(), &peer1_chain); - assert_eq!(net.peer(1).chain.numbers.read().unwrap().deref(), &peer1_chain); - assert_eq!(net.peer(2).chain.numbers.read().unwrap().deref(), &peer1_chain); -} - -#[test] -fn chain_restart() { - let mut net = TestNet::new(3); - net.peer_mut(1).chain.add_blocks(1000, false); - net.peer_mut(2).chain.add_blocks(1000, false); - - net.sync_steps(8); - - // make sure that sync has actually happened - assert!(net.peer(0).chain.chain_info().best_block_number > 100); - net.restart_peer(0); - - let status = net.peer(0).sync.status(); - assert_eq!(status.state, SyncState::NotSynced); -} - -#[test] -fn chain_status_empty() { - let net = TestNet::new(2); - assert_eq!(net.peer(0).sync.status().state, SyncState::NotSynced); -}