diff --git a/bin/Cargo.toml b/bin/Cargo.toml index 7174ada14..8fc233796 100644 --- a/bin/Cargo.toml +++ b/bin/Cargo.toml @@ -15,6 +15,7 @@ ctrlc = "1.0" ethcore-util = { path = "../util" } ethcore-rpc = { path = "../rpc", optional = true } ethcore = { path = ".." } +ethsync = { path = "../sync" } clippy = "0.0.37" [features] diff --git a/bin/src/main.rs b/bin/src/main.rs index 3d4199fcf..92f0cbf20 100644 --- a/bin/src/main.rs +++ b/bin/src/main.rs @@ -8,6 +8,7 @@ extern crate docopt; extern crate rustc_serialize; extern crate ethcore_util as util; extern crate ethcore; +extern crate ethsync; extern crate log; extern crate env_logger; extern crate ctrlc; @@ -24,7 +25,7 @@ use ethcore::client::*; use ethcore::service::{ClientService, NetSyncMessage}; use ethcore::ethereum; use ethcore::blockchain::CacheSize; -use ethcore::sync::EthSync; +use ethsync::EthSync; docopt!(Args derive Debug, " Parity. Ethereum Client. @@ -81,8 +82,10 @@ fn main() { let mut net_settings = NetworkConfiguration::new(); net_settings.boot_nodes = init_nodes; let mut service = ClientService::start(spec, net_settings).unwrap(); - setup_rpc_server(service.client(), service.sync()); - let io_handler = Arc::new(ClientIoHandler { client: service.client(), info: Default::default(), sync: service.sync() }); + let client = service.client().clone(); + let sync = EthSync::register(service.network(), client); + setup_rpc_server(service.client(), sync.clone()); + let io_handler = Arc::new(ClientIoHandler { client: service.client(), info: Default::default(), sync: sync }); service.io().register_handler(io_handler).expect("Error registering IO handler"); let exit = Arc::new(Condvar::new()); diff --git a/src/lib.rs b/src/lib.rs index e084635dd..0652d964e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -151,8 +151,6 @@ mod tests; /// TODO [arkpar] Please document me pub mod client; /// TODO [arkpar] Please document me -pub mod sync; -/// TODO [arkpar] Please document me pub mod block; /// TODO [arkpar] Please document me pub mod verification; diff --git a/src/service.rs b/src/service.rs index 8c900d20a..a56ed7f44 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,5 +1,4 @@ use util::*; -use sync::*; use spec::Spec; use error::*; use std::env; @@ -21,7 +20,6 @@ pub type NetSyncMessage = NetworkIoMessage; pub struct ClientService { net_service: NetworkService, client: Arc, - sync: Arc, } impl ClientService { @@ -34,7 +32,6 @@ impl ClientService { dir.push(".parity"); dir.push(H64::from(spec.genesis_header().hash()).hex()); let client = try!(Client::new(spec, &dir, net_service.io().channel())); - let sync = EthSync::register(&mut net_service, client.clone()); let client_io = Arc::new(ClientIoHandler { client: client.clone() }); @@ -43,29 +40,28 @@ impl ClientService { Ok(ClientService { net_service: net_service, client: client, - sync: sync, }) } - /// Get the network service. + /// Add a node to network pub fn add_node(&mut self, _enode: &str) { unimplemented!(); } - /// TODO [arkpar] Please document me + /// Get general IO interface pub fn io(&mut self) -> &mut IoService { self.net_service.io() } - /// TODO [arkpar] Please document me + /// Get client interface pub fn client(&self) -> Arc { self.client.clone() } - - /// Get shared sync handler - pub fn sync(&self) -> Arc { - self.sync.clone() + + /// Get network service component + pub fn network(&mut self) -> &mut NetworkService { + &mut self.net_service } } diff --git a/sync/Cargo.toml b/sync/Cargo.toml new file mode 100644 index 000000000..c3ae470fd --- /dev/null +++ b/sync/Cargo.toml @@ -0,0 +1,16 @@ +[package] +description = "Ethcore blockchain sync" +name = "ethsync" +version = "0.1.0" +license = "GPL-3.0" +authors = ["Ethcore usize { + *self as usize + } +} + +impl FromUsize for BlockNumber { + fn from_usize(s: usize) -> BlockNumber { + s as BlockNumber + } +} + +type PacketDecodeError = DecoderError; + +const PROTOCOL_VERSION: u8 = 63u8; +const MAX_BODIES_TO_SEND: usize = 256; +const MAX_HEADERS_TO_SEND: usize = 512; +const MAX_NODE_DATA_TO_SEND: usize = 1024; +const MAX_RECEIPTS_TO_SEND: usize = 1024; +const MAX_HEADERS_TO_REQUEST: usize = 512; +const MAX_BODIES_TO_REQUEST: usize = 256; + +const STATUS_PACKET: u8 = 0x00; +const NEW_BLOCK_HASHES_PACKET: u8 = 0x01; +const TRANSACTIONS_PACKET: u8 = 0x02; +const GET_BLOCK_HEADERS_PACKET: u8 = 0x03; +const BLOCK_HEADERS_PACKET: u8 = 0x04; +const GET_BLOCK_BODIES_PACKET: u8 = 0x05; +const BLOCK_BODIES_PACKET: u8 = 0x06; +const NEW_BLOCK_PACKET: u8 = 0x07; + +const GET_NODE_DATA_PACKET: u8 = 0x0d; +const NODE_DATA_PACKET: u8 = 0x0e; +const GET_RECEIPTS_PACKET: u8 = 0x0f; +const RECEIPTS_PACKET: u8 = 0x10; + +const NETWORK_ID: U256 = ONE_U256; //TODO: get this from parent + +struct Header { + /// Header data + data: Bytes, + /// Block hash + hash: H256, + /// Parent hash + parent: H256, +} + +/// Used to identify header by transactions and uncles hashes +#[derive(Eq, PartialEq, Hash)] +struct HeaderId { + transactions_root: H256, + uncles: H256 +} + +#[derive(Copy, Clone, Eq, PartialEq, Debug)] +/// Sync state +pub enum SyncState { + /// Initial chain sync has not started yet + NotSynced, + /// Initial chain sync complete. Waiting for new packets + Idle, + /// Block downloading paused. Waiting for block queue to process blocks and free some space + Waiting, + /// Downloading blocks + Blocks, + /// Downloading blocks learned from NewHashes packet + NewBlocks, +} + +/// Syncing status and statistics +pub struct SyncStatus { + /// State + pub state: SyncState, + /// Syncing protocol version. That's the maximum protocol version we connect to. + pub protocol_version: u8, + /// BlockChain height for the moment the sync started. + pub start_block_number: BlockNumber, + /// Last fully downloaded and imported block number. + pub last_imported_block_number: BlockNumber, + /// Highest block number in the download queue. + pub highest_block_number: BlockNumber, + /// Total number of blocks for the sync process. + pub blocks_total: usize, + /// Number of blocks downloaded so far. + pub blocks_received: usize, + /// Total number of connected peers + pub num_peers: usize, + /// Total number of active peers + pub num_active_peers: usize, +} + +#[derive(PartialEq, Eq, Debug)] +/// Peer data type requested +enum PeerAsking { + Nothing, + BlockHeaders, + BlockBodies, +} + +/// Syncing peer information +struct PeerInfo { + /// eth protocol version + protocol_version: u32, + /// Peer chain genesis hash + genesis: H256, + /// Peer network id + network_id: U256, + /// Peer best block hash + latest: H256, + /// Peer total difficulty + difficulty: U256, + /// Type of data currenty being requested from peer. + asking: PeerAsking, + /// A set of block numbers being requested + asking_blocks: Vec, +} + +/// Blockchain sync handler. +/// See module documentation for more details. +pub struct ChainSync { + /// Sync state + state: SyncState, + /// Last block number for the start of sync + starting_block: BlockNumber, + /// Highest block number seen + highest_block: BlockNumber, + /// Set of block header numbers being downloaded + downloading_headers: HashSet, + /// Set of block body numbers being downloaded + downloading_bodies: HashSet, + /// Downloaded headers. + headers: Vec<(BlockNumber, Vec
)>, //TODO: use BTreeMap once range API is sable. For now it is a vector sorted in descending order + /// Downloaded bodies + bodies: Vec<(BlockNumber, Vec)>, //TODO: use BTreeMap once range API is sable. For now it is a vector sorted in descending order + /// Peer info + peers: HashMap, + /// Used to map body to header + header_ids: HashMap, + /// Last impoted block number + last_imported_block: BlockNumber, + /// Last impoted block hash + last_imported_hash: H256, + /// Syncing total difficulty + syncing_difficulty: U256, + /// True if common block for our and remote chain has been found + have_common_block: bool, +} + + +impl ChainSync { + /// Create a new instance of syncing strategy. + pub fn new() -> ChainSync { + ChainSync { + state: SyncState::NotSynced, + starting_block: 0, + highest_block: 0, + downloading_headers: HashSet::new(), + downloading_bodies: HashSet::new(), + headers: Vec::new(), + bodies: Vec::new(), + peers: HashMap::new(), + header_ids: HashMap::new(), + last_imported_block: 0, + last_imported_hash: H256::new(), + syncing_difficulty: U256::from(0u64), + have_common_block: false, + } + } + + /// @returns Synchonization status + pub fn status(&self) -> SyncStatus { + SyncStatus { + state: self.state.clone(), + protocol_version: 63, + start_block_number: self.starting_block, + last_imported_block_number: self.last_imported_block, + highest_block_number: self.highest_block, + blocks_received: (self.last_imported_block - self.starting_block) as usize, + blocks_total: (self.highest_block - self.starting_block) as usize, + num_peers: self.peers.len(), + num_active_peers: self.peers.values().filter(|p| p.asking != PeerAsking::Nothing).count(), + } + } + + /// Abort all sync activity + pub fn abort(&mut self, io: &mut SyncIo) { + self.restart(io); + self.peers.clear(); + } + + /// Rest sync. Clear all downloaded data but keep the queue + fn reset(&mut self) { + self.downloading_headers.clear(); + self.downloading_bodies.clear(); + self.headers.clear(); + self.bodies.clear(); + for (_, ref mut p) in &mut self.peers { + p.asking_blocks.clear(); + } + self.header_ids.clear(); + self.syncing_difficulty = From::from(0u64); + self.state = SyncState::Idle; + } + + /// Restart sync + pub fn restart(&mut self, io: &mut SyncIo) { + self.reset(); + self.last_imported_block = 0; + self.last_imported_hash = H256::new(); + self.starting_block = 0; + self.highest_block = 0; + self.have_common_block = false; + io.chain().clear_queue(); + self.starting_block = io.chain().chain_info().best_block_number; + self.state = SyncState::NotSynced; + } + + /// Called by peer to report status + fn on_peer_status(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + let peer = PeerInfo { + protocol_version: try!(r.val_at(0)), + network_id: try!(r.val_at(1)), + difficulty: try!(r.val_at(2)), + latest: try!(r.val_at(3)), + genesis: try!(r.val_at(4)), + asking: PeerAsking::Nothing, + asking_blocks: Vec::new(), + }; + + trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest, peer.genesis); + + let chain_info = io.chain().chain_info(); + if peer.genesis != chain_info.genesis_hash { + io.disable_peer(peer_id); + trace!(target: "sync", "Peer {} genesis hash not matched", peer_id); + return Ok(()); + } + if peer.network_id != NETWORK_ID { + io.disable_peer(peer_id); + trace!(target: "sync", "Peer {} network id not matched", peer_id); + return Ok(()); + } + + let old = self.peers.insert(peer_id.clone(), peer); + if old.is_some() { + panic!("ChainSync: new peer already exists"); + } + info!(target: "sync", "Connected {}:{}", peer_id, io.peer_info(peer_id)); + self.sync_peer(io, peer_id, false); + Ok(()) + } + + #[allow(cyclomatic_complexity)] + /// Called by peer once it has new block headers during sync + fn on_peer_block_headers(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + self.reset_peer_asking(peer_id, PeerAsking::BlockHeaders); + let item_count = r.item_count(); + trace!(target: "sync", "{} -> BlockHeaders ({} entries)", peer_id, item_count); + self.clear_peer_download(peer_id); + if self.state != SyncState::Blocks && self.state != SyncState::NewBlocks && self.state != SyncState::Waiting { + trace!(target: "sync", "Ignored unexpected block headers"); + return Ok(()); + } + if self.state == SyncState::Waiting { + trace!(target: "sync", "Ignored block headers while waiting"); + return Ok(()); + } + + for i in 0..item_count { + let info: BlockHeader = try!(r.val_at(i)); + let number = BlockNumber::from(info.number); + if number <= self.last_imported_block || self.headers.have_item(&number) { + trace!(target: "sync", "Skipping existing block header"); + continue; + } + if number > self.highest_block { + self.highest_block = number; + } + let hash = info.hash(); + match io.chain().block_status(&hash) { + BlockStatus::InChain => { + self.have_common_block = true; + self.last_imported_block = number; + self.last_imported_hash = hash.clone(); + trace!(target: "sync", "Found common header {} ({})", number, hash); + }, + _ => { + if self.have_common_block { + //validate chain + if self.have_common_block && number == self.last_imported_block + 1 && info.parent_hash != self.last_imported_hash { + // TODO: lower peer rating + debug!(target: "sync", "Mismatched block header {} {}", number, hash); + continue; + } + if self.headers.find_item(&(number - 1)).map_or(false, |p| p.hash != info.parent_hash) { + // mismatching parent id, delete the previous block and don't add this one + // TODO: lower peer rating + debug!(target: "sync", "Mismatched block header {} {}", number, hash); + self.remove_downloaded_blocks(number - 1); + continue; + } + if self.headers.find_item(&(number + 1)).map_or(false, |p| p.parent != hash) { + // mismatching parent id for the next block, clear following headers + debug!(target: "sync", "Mismatched block header {}", number + 1); + self.remove_downloaded_blocks(number + 1); + } + } + let hdr = Header { + data: try!(r.at(i)).as_raw().to_vec(), + hash: hash.clone(), + parent: info.parent_hash, + }; + self.headers.insert_item(number, hdr); + let header_id = HeaderId { + transactions_root: info.transactions_root, + uncles: info.uncles_hash + }; + trace!(target: "sync", "Got header {} ({})", number, hash); + if header_id.transactions_root == rlp::SHA3_NULL_RLP && header_id.uncles == rlp::SHA3_EMPTY_LIST_RLP { + //empty body, just mark as downloaded + let mut body_stream = RlpStream::new_list(2); + body_stream.append_raw(&rlp::NULL_RLP, 1); + body_stream.append_raw(&rlp::EMPTY_LIST_RLP, 1); + self.bodies.insert_item(number, body_stream.out()); + } + else { + self.header_ids.insert(header_id, number); + } + } + } + } + self.collect_blocks(io); + self.continue_sync(io); + Ok(()) + } + + /// Called by peer once it has new block bodies + fn on_peer_block_bodies(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + use util::triehash::ordered_trie_root; + self.reset_peer_asking(peer_id, PeerAsking::BlockBodies); + let item_count = r.item_count(); + trace!(target: "sync", "{} -> BlockBodies ({} entries)", peer_id, item_count); + self.clear_peer_download(peer_id); + if self.state != SyncState::Blocks && self.state != SyncState::NewBlocks && self.state != SyncState::Waiting { + trace!(target: "sync", "Ignored unexpected block bodies"); + return Ok(()); + } + if self.state == SyncState::Waiting { + trace!(target: "sync", "Ignored block bodies while waiting"); + return Ok(()); + } + for i in 0..item_count { + let body = try!(r.at(i)); + let tx = try!(body.at(0)); + let tx_root = ordered_trie_root(tx.iter().map(|r| r.as_raw().to_vec()).collect()); //TODO: get rid of vectors here + let uncles = try!(body.at(1)).as_raw().sha3(); + let header_id = HeaderId { + transactions_root: tx_root, + uncles: uncles + }; + match self.header_ids.get(&header_id).cloned() { + Some(n) => { + self.header_ids.remove(&header_id); + self.bodies.insert_item(n, body.as_raw().to_vec()); + trace!(target: "sync", "Got body {}", n); + } + None => { + debug!(target: "sync", "Ignored unknown block body"); + } + } + } + self.collect_blocks(io); + self.continue_sync(io); + Ok(()) + } + + /// Called by peer once it has new block bodies + fn on_peer_new_block(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + let block_rlp = try!(r.at(0)); + let header_rlp = try!(block_rlp.at(0)); + let h = header_rlp.as_raw().sha3(); + + trace!(target: "sync", "{} -> NewBlock ({})", peer_id, h); + let header_view = HeaderView::new(header_rlp.as_raw()); + // TODO: Decompose block and add to self.headers and self.bodies instead + if header_view.number() == From::from(self.last_imported_block + 1) { + match io.chain().import_block(block_rlp.as_raw().to_vec()) { + Err(ImportError::AlreadyInChain) => { + trace!(target: "sync", "New block already in chain {:?}", h); + }, + Err(ImportError::AlreadyQueued) => { + trace!(target: "sync", "New block already queued {:?}", h); + }, + Ok(_) => { + trace!(target: "sync", "New block queued {:?}", h); + }, + Err(e) => { + debug!(target: "sync", "Bad new block {:?} : {:?}", h, e); + io.disable_peer(peer_id); + } + }; + } + else { + trace!(target: "sync", "New block unknown {:?}", h); + //TODO: handle too many unknown blocks + let difficulty: U256 = try!(r.val_at(1)); + let peer_difficulty = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").difficulty; + if difficulty > peer_difficulty { + trace!(target: "sync", "Received block {:?} with no known parent. Peer needs syncing...", h); + { + let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + peer.latest = header_view.sha3(); + } + self.sync_peer(io, peer_id, true); + } + } + Ok(()) + } + + /// Handles NewHashes packet. Initiates headers download for any unknown hashes. + fn on_peer_new_hashes(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + if self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").asking != PeerAsking::Nothing { + trace!(target: "sync", "Ignoring new hashes since we're already downloading."); + return Ok(()); + } + trace!(target: "sync", "{} -> NewHashes ({} entries)", peer_id, r.item_count()); + let hashes = r.iter().map(|item| (item.val_at::(0), item.val_at::(1))); + let mut max_height: U256 = From::from(0); + for (rh, rd) in hashes { + let h = try!(rh); + let d = try!(rd); + match io.chain().block_status(&h) { + BlockStatus::InChain => { + trace!(target: "sync", "New block hash already in chain {:?}", h); + }, + BlockStatus::Queued => { + trace!(target: "sync", "New hash block already queued {:?}", h); + }, + BlockStatus::Unknown => { + trace!(target: "sync", "New unknown block hash {:?}", h); + if d > max_height { + let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + peer.latest = h.clone(); + max_height = d; + } + }, + BlockStatus::Bad =>{ + debug!(target: "sync", "Bad new block hash {:?}", h); + io.disable_peer(peer_id); + return Ok(()); + } + } + }; + if max_height != x!(0) { + self.sync_peer(io, peer_id, true); + } + Ok(()) + } + + /// Called by peer when it is disconnecting + pub fn on_peer_aborting(&mut self, io: &mut SyncIo, peer: PeerId) { + trace!(target: "sync", "== Disconnecting {}", peer); + if self.peers.contains_key(&peer) { + info!(target: "sync", "Disconnected {}:{}", peer, io.peer_info(peer)); + self.clear_peer_download(peer); + self.peers.remove(&peer); + self.continue_sync(io); + } + } + + /// Called when a new peer is connected + pub fn on_peer_connected(&mut self, io: &mut SyncIo, peer: PeerId) { + trace!(target: "sync", "== Connected {}", peer); + self.send_status(io, peer); + } + + /// Resume downloading + fn continue_sync(&mut self, io: &mut SyncIo) { + let mut peers: Vec<(PeerId, U256)> = self.peers.iter().map(|(k, p)| (*k, p.difficulty)).collect(); + peers.sort_by(|&(_, d1), &(_, d2)| d1.cmp(&d2).reverse()); //TODO: sort by rating + for (p, _) in peers { + self.sync_peer(io, p, false); + } + } + + /// Called after all blocks have been donloaded + fn complete_sync(&mut self) { + trace!(target: "sync", "Sync complete"); + self.reset(); + self.state = SyncState::Idle; + } + + /// Enter waiting state + fn pause_sync(&mut self) { + trace!(target: "sync", "Block queue full, pausing sync"); + self.state = SyncState::Waiting; + } + + /// Find something to do for a peer. Called for a new peer or when a peer is done with it's task. + fn sync_peer(&mut self, io: &mut SyncIo, peer_id: PeerId, force: bool) { + let (peer_latest, peer_difficulty) = { + let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + if peer.asking != PeerAsking::Nothing { + return; + } + if self.state == SyncState::Waiting { + trace!(target: "sync", "Waiting for block queue"); + return; + } + (peer.latest.clone(), peer.difficulty.clone()) + }; + + let td = io.chain().chain_info().pending_total_difficulty; + let syncing_difficulty = max(self.syncing_difficulty, td); + if force || peer_difficulty > syncing_difficulty { + // start sync + self.syncing_difficulty = peer_difficulty; + if self.state == SyncState::Idle || self.state == SyncState::NotSynced { + self.state = SyncState::Blocks; + } + trace!(target: "sync", "Starting sync with better chain"); + self.request_headers_by_hash(io, peer_id, &peer_latest, 1, 0, false); + } + else if self.state == SyncState::Blocks { + self.request_blocks(io, peer_id); + } + } + + /// Find some headers or blocks to download for a peer. + fn request_blocks(&mut self, io: &mut SyncIo, peer_id: PeerId) { + self.clear_peer_download(peer_id); + + if io.chain().queue_info().full { + self.pause_sync(); + return; + } + + // check to see if we need to download any block bodies first + let mut needed_bodies: Vec = Vec::new(); + let mut needed_numbers: Vec = Vec::new(); + + if self.have_common_block && !self.headers.is_empty() && self.headers.range_iter().next().unwrap().0 == self.last_imported_block + 1 { + for (start, ref items) in self.headers.range_iter() { + if needed_bodies.len() > MAX_BODIES_TO_REQUEST { + break; + } + let mut index: BlockNumber = 0; + while index != items.len() as BlockNumber && needed_bodies.len() < MAX_BODIES_TO_REQUEST { + let block = start + index; + if !self.downloading_bodies.contains(&block) && !self.bodies.have_item(&block) { + needed_bodies.push(items[index as usize].hash.clone()); + needed_numbers.push(block); + self.downloading_bodies.insert(block); + } + index += 1; + } + } + } + if !needed_bodies.is_empty() { + replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, needed_numbers); + self.request_bodies(io, peer_id, needed_bodies); + } + else { + // check if need to download headers + let mut start = 0usize; + if !self.have_common_block { + // download backwards until common block is found 1 header at a time + let chain_info = io.chain().chain_info(); + start = chain_info.best_block_number as usize; + if !self.headers.is_empty() { + start = min(start, self.headers.range_iter().next().unwrap().0 as usize - 1); + } + if start == 0 { + self.have_common_block = true; //reached genesis + self.last_imported_hash = chain_info.genesis_hash; + } + } + if self.have_common_block { + let mut headers: Vec = Vec::new(); + let mut prev = self.last_imported_block + 1; + for (next, ref items) in self.headers.range_iter() { + if !headers.is_empty() { + break; + } + if next <= prev { + prev = next + items.len() as BlockNumber; + continue; + } + let mut block = prev; + while block < next && headers.len() <= MAX_HEADERS_TO_REQUEST { + if !self.downloading_headers.contains(&(block as BlockNumber)) { + headers.push(block as BlockNumber); + self.downloading_headers.insert(block as BlockNumber); + } + block += 1; + } + prev = next + items.len() as BlockNumber; + } + + if !headers.is_empty() { + start = headers[0] as usize; + let count = headers.len(); + replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, headers); + assert!(!self.headers.have_item(&(start as BlockNumber))); + self.request_headers_by_number(io, peer_id, start as BlockNumber, count, 0, false); + } + } + else { + self.request_headers_by_number(io, peer_id, start as BlockNumber, 1, 0, false); + } + } + } + + /// Clear all blocks/headers marked as being downloaded by a peer. + fn clear_peer_download(&mut self, peer_id: PeerId) { + let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + for b in &peer.asking_blocks { + self.downloading_headers.remove(&b); + self.downloading_bodies.remove(&b); + } + peer.asking_blocks.clear(); + } + + /// Checks if there are blocks fully downloaded that can be imported into the blockchain and does the import. + fn collect_blocks(&mut self, io: &mut SyncIo) { + if !self.have_common_block || self.headers.is_empty() || self.bodies.is_empty() { + return; + } + + let mut restart = false; + // merge headers and bodies + { + let headers = self.headers.range_iter().next().unwrap(); + let bodies = self.bodies.range_iter().next().unwrap(); + if headers.0 != bodies.0 || headers.0 != self.last_imported_block + 1 { + return; + } + + let count = min(headers.1.len(), bodies.1.len()); + let mut imported = 0; + for i in 0..count { + let mut block_rlp = RlpStream::new_list(3); + block_rlp.append_raw(&headers.1[i].data, 1); + let body = Rlp::new(&bodies.1[i]); + block_rlp.append_raw(body.at(0).as_raw(), 1); + block_rlp.append_raw(body.at(1).as_raw(), 1); + let h = &headers.1[i].hash; + match io.chain().import_block(block_rlp.out()) { + Err(ImportError::AlreadyInChain) => { + trace!(target: "sync", "Block already in chain {:?}", h); + self.last_imported_block = headers.0 + i as BlockNumber; + self.last_imported_hash = h.clone(); + }, + Err(ImportError::AlreadyQueued) => { + trace!(target: "sync", "Block already queued {:?}", h); + self.last_imported_block = headers.0 + i as BlockNumber; + self.last_imported_hash = h.clone(); + }, + Ok(_) => { + trace!(target: "sync", "Block queued {:?}", h); + self.last_imported_block = headers.0 + i as BlockNumber; + self.last_imported_hash = h.clone(); + imported += 1; + }, + Err(e) => { + debug!(target: "sync", "Bad block {:?} : {:?}", h, e); + restart = true; + } + } + } + trace!(target: "sync", "Imported {} of {}", imported, count); + } + + if restart { + self.restart(io); + return; + } + + self.headers.remove_head(&(self.last_imported_block + 1)); + self.bodies.remove_head(&(self.last_imported_block + 1)); + + if self.headers.is_empty() { + assert!(self.bodies.is_empty()); + self.complete_sync(); + } + } + + /// Remove downloaded bocks/headers starting from specified number. + /// Used to recover from an error and re-download parts of the chain detected as bad. + fn remove_downloaded_blocks(&mut self, start: BlockNumber) { + for n in self.headers.get_tail(&start) { + if let Some(ref header_data) = self.headers.find_item(&n) { + let header_to_delete = HeaderView::new(&header_data.data); + let header_id = HeaderId { + transactions_root: header_to_delete.transactions_root(), + uncles: header_to_delete.uncles_hash() + }; + self.header_ids.remove(&header_id); + } + self.downloading_bodies.remove(&n); + self.downloading_headers.remove(&n); + } + self.headers.remove_tail(&start); + self.bodies.remove_tail(&start); + } + + /// Request headers from a peer by block hash + fn request_headers_by_hash(&mut self, sync: &mut SyncIo, peer_id: PeerId, h: &H256, count: usize, skip: usize, reverse: bool) { + trace!(target: "sync", "{} <- GetBlockHeaders: {} entries starting from {}", peer_id, count, h); + let mut rlp = RlpStream::new_list(4); + rlp.append(h); + rlp.append(&count); + rlp.append(&skip); + rlp.append(&if reverse {1u32} else {0u32}); + self.send_request(sync, peer_id, PeerAsking::BlockHeaders, GET_BLOCK_HEADERS_PACKET, rlp.out()); + } + + /// Request headers from a peer by block number + fn request_headers_by_number(&mut self, sync: &mut SyncIo, peer_id: PeerId, n: BlockNumber, count: usize, skip: usize, reverse: bool) { + let mut rlp = RlpStream::new_list(4); + trace!(target: "sync", "{} <- GetBlockHeaders: {} entries starting from {}", peer_id, count, n); + rlp.append(&n); + rlp.append(&count); + rlp.append(&skip); + rlp.append(&if reverse {1u32} else {0u32}); + self.send_request(sync, peer_id, PeerAsking::BlockHeaders, GET_BLOCK_HEADERS_PACKET, rlp.out()); + } + + /// Request block bodies from a peer + fn request_bodies(&mut self, sync: &mut SyncIo, peer_id: PeerId, hashes: Vec) { + let mut rlp = RlpStream::new_list(hashes.len()); + trace!(target: "sync", "{} <- GetBlockBodies: {} entries", peer_id, hashes.len()); + for h in hashes { + rlp.append(&h); + } + self.send_request(sync, peer_id, PeerAsking::BlockBodies, GET_BLOCK_BODIES_PACKET, rlp.out()); + } + + /// Reset peer status after request is complete. + fn reset_peer_asking(&mut self, peer_id: PeerId, asking: PeerAsking) { + let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + if peer.asking != asking { + warn!(target:"sync", "Asking {:?} while expected {:?}", peer.asking, asking); + } + else { + peer.asking = PeerAsking::Nothing; + } + } + + /// Generic request sender + fn send_request(&mut self, sync: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, packet_id: PacketId, packet: Bytes) { + { + let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + if peer.asking != PeerAsking::Nothing { + warn!(target:"sync", "Asking {:?} while requesting {:?}", asking, peer.asking); + } + } + match sync.send(peer_id, packet_id, packet) { + Err(e) => { + warn!(target:"sync", "Error sending request: {:?}", e); + sync.disable_peer(peer_id); + self.on_peer_aborting(sync, peer_id); + } + Ok(_) => { + let mut peer = self.peers.get_mut(&peer_id).unwrap(); + peer.asking = asking; + } + } + } + + /// Called when peer sends us new transactions + fn on_peer_transactions(&mut self, _io: &mut SyncIo, _peer_id: PeerId, _r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + Ok(()) + } + + /// Send Status message + fn send_status(&mut self, io: &mut SyncIo, peer_id: PeerId) { + let mut packet = RlpStream::new_list(5); + let chain = io.chain().chain_info(); + packet.append(&(PROTOCOL_VERSION as u32)); + packet.append(&NETWORK_ID); //TODO: network id + packet.append(&chain.total_difficulty); + packet.append(&chain.best_block_hash); + packet.append(&chain.genesis_hash); + //TODO: handle timeout for status request + if let Err(e) = io.send(peer_id, STATUS_PACKET, packet.out()) { + warn!(target:"sync", "Error sending status request: {:?}", e); + io.disable_peer(peer_id); + } + } + + /// Respond to GetBlockHeaders request + fn return_block_headers(&self, io: &mut SyncIo, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + // Packet layout: + // [ block: { P , B_32 }, maxHeaders: P, skip: P, reverse: P in { 0 , 1 } ] + let max_headers: usize = try!(r.val_at(1)); + let skip: usize = try!(r.val_at(2)); + let reverse: bool = try!(r.val_at(3)); + let last = io.chain().chain_info().best_block_number; + let mut number = if try!(r.at(0)).size() == 32 { + // id is a hash + let hash: H256 = try!(r.val_at(0)); + trace!(target: "sync", "-> GetBlockHeaders (hash: {}, max: {}, skip: {}, reverse:{})", hash, max_headers, skip, reverse); + match io.chain().block_header(&hash) { + Some(hdr) => From::from(HeaderView::new(&hdr).number()), + None => last + } + } + else { + trace!(target: "sync", "-> GetBlockHeaders (number: {}, max: {}, skip: {}, reverse:{})", try!(r.val_at::(0)), max_headers, skip, reverse); + try!(r.val_at(0)) + }; + + if reverse { + number = min(last, number); + } else { + number = max(1, number); + } + let max_count = min(MAX_HEADERS_TO_SEND, max_headers); + let mut count = 0; + let mut data = Bytes::new(); + let inc = (skip + 1) as BlockNumber; + while number <= last && number > 0 && count < max_count { + if let Some(mut hdr) = io.chain().block_header_at(number) { + data.append(&mut hdr); + count += 1; + } + if reverse { + if number <= inc { + break; + } + number -= inc; + } + else { + number += inc; + } + } + let mut rlp = RlpStream::new_list(count as usize); + rlp.append_raw(&data, count as usize); + io.respond(BLOCK_HEADERS_PACKET, rlp.out()).unwrap_or_else(|e| + debug!(target: "sync", "Error sending headers: {:?}", e)); + trace!(target: "sync", "-> GetBlockHeaders: returned {} entries", count); + Ok(()) + } + + /// Respond to GetBlockBodies request + fn return_block_bodies(&self, io: &mut SyncIo, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + let mut count = r.item_count(); + if count == 0 { + debug!(target: "sync", "Empty GetBlockBodies request, ignoring."); + return Ok(()); + } + trace!(target: "sync", "-> GetBlockBodies: {} entries", count); + count = min(count, MAX_BODIES_TO_SEND); + let mut added = 0usize; + let mut data = Bytes::new(); + for i in 0..count { + if let Some(mut hdr) = io.chain().block_body(&try!(r.val_at::(i))) { + data.append(&mut hdr); + added += 1; + } + } + let mut rlp = RlpStream::new_list(added); + rlp.append_raw(&data, added); + io.respond(BLOCK_BODIES_PACKET, rlp.out()).unwrap_or_else(|e| + debug!(target: "sync", "Error sending headers: {:?}", e)); + trace!(target: "sync", "-> GetBlockBodies: returned {} entries", added); + Ok(()) + } + + /// Respond to GetNodeData request + fn return_node_data(&self, io: &mut SyncIo, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + let mut count = r.item_count(); + if count == 0 { + debug!(target: "sync", "Empty GetNodeData request, ignoring."); + return Ok(()); + } + count = min(count, MAX_NODE_DATA_TO_SEND); + let mut added = 0usize; + let mut data = Bytes::new(); + for i in 0..count { + if let Some(mut hdr) = io.chain().state_data(&try!(r.val_at::(i))) { + data.append(&mut hdr); + added += 1; + } + } + let mut rlp = RlpStream::new_list(added); + rlp.append_raw(&data, added); + io.respond(NODE_DATA_PACKET, rlp.out()).unwrap_or_else(|e| + debug!(target: "sync", "Error sending headers: {:?}", e)); + Ok(()) + } + + /// Respond to GetReceipts request + fn return_receipts(&self, io: &mut SyncIo, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + let mut count = r.item_count(); + if count == 0 { + debug!(target: "sync", "Empty GetReceipts request, ignoring."); + return Ok(()); + } + count = min(count, MAX_RECEIPTS_TO_SEND); + let mut added = 0usize; + let mut data = Bytes::new(); + for i in 0..count { + if let Some(mut hdr) = io.chain().block_receipts(&try!(r.val_at::(i))) { + data.append(&mut hdr); + added += 1; + } + } + let mut rlp = RlpStream::new_list(added); + rlp.append_raw(&data, added); + io.respond(RECEIPTS_PACKET, rlp.out()).unwrap_or_else(|e| + debug!(target: "sync", "Error sending headers: {:?}", e)); + Ok(()) + } + + /// Dispatch incoming requests and responses + pub fn on_packet(&mut self, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) { + let rlp = UntrustedRlp::new(data); + let result = match packet_id { + STATUS_PACKET => self.on_peer_status(io, peer, &rlp), + TRANSACTIONS_PACKET => self.on_peer_transactions(io, peer, &rlp), + GET_BLOCK_HEADERS_PACKET => self.return_block_headers(io, &rlp), + BLOCK_HEADERS_PACKET => self.on_peer_block_headers(io, peer, &rlp), + GET_BLOCK_BODIES_PACKET => self.return_block_bodies(io, &rlp), + BLOCK_BODIES_PACKET => self.on_peer_block_bodies(io, peer, &rlp), + NEW_BLOCK_PACKET => self.on_peer_new_block(io, peer, &rlp), + NEW_BLOCK_HASHES_PACKET => self.on_peer_new_hashes(io, peer, &rlp), + GET_NODE_DATA_PACKET => self.return_node_data(io, &rlp), + GET_RECEIPTS_PACKET => self.return_receipts(io, &rlp), + _ => { + debug!(target: "sync", "Unknown packet {}", packet_id); + Ok(()) + } + }; + result.unwrap_or_else(|e| { + debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e); + }) + } + + /// Maintain other peers. Send out any new blocks and transactions + pub fn _maintain_sync(&mut self, _io: &mut SyncIo) { + } +} + diff --git a/sync/src/io.rs b/sync/src/io.rs new file mode 100644 index 000000000..4425a2555 --- /dev/null +++ b/sync/src/io.rs @@ -0,0 +1,62 @@ +use ethcore::client::BlockChainClient; +use util::{NetworkContext, PeerId, PacketId,}; +use util::error::UtilError; +use ethcore::service::SyncMessage; + +/// IO interface for the syning handler. +/// Provides peer connection management and an interface to the blockchain client. +// TODO: ratings +pub trait SyncIo { + /// Disable a peer + fn disable_peer(&mut self, peer_id: PeerId); + /// Respond to current request with a packet. Can be called from an IO handler for incoming packet. + fn respond(&mut self, packet_id: PacketId, data: Vec) -> Result<(), UtilError>; + /// Send a packet to a peer. + fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), UtilError>; + /// Get the blockchain + fn chain(&self) -> &BlockChainClient; + /// Returns peer client identifier string + fn peer_info(&self, peer_id: PeerId) -> String { + peer_id.to_string() + } +} + +/// Wraps `NetworkContext` and the blockchain client +pub struct NetSyncIo<'s, 'h> where 'h: 's { + network: &'s NetworkContext<'h, SyncMessage>, + chain: &'s BlockChainClient +} + +impl<'s, 'h> NetSyncIo<'s, 'h> { + /// Creates a new instance from the `NetworkContext` and the blockchain client reference. + pub fn new(network: &'s NetworkContext<'h, SyncMessage>, chain: &'s BlockChainClient) -> NetSyncIo<'s, 'h> { + NetSyncIo { + network: network, + chain: chain, + } + } +} + +impl<'s, 'h> SyncIo for NetSyncIo<'s, 'h> { + fn disable_peer(&mut self, peer_id: PeerId) { + self.network.disable_peer(peer_id); + } + + fn respond(&mut self, packet_id: PacketId, data: Vec) -> Result<(), UtilError>{ + self.network.respond(packet_id, data) + } + + fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), UtilError>{ + self.network.send(peer_id, packet_id, data) + } + + fn chain(&self) -> &BlockChainClient { + self.chain + } + + fn peer_info(&self, peer_id: PeerId) -> String { + self.network.peer_info(peer_id) + } +} + + diff --git a/sync/src/lib.rs b/sync/src/lib.rs new file mode 100644 index 000000000..09f3eb521 --- /dev/null +++ b/sync/src/lib.rs @@ -0,0 +1,106 @@ +#![warn(missing_docs)] +#![feature(plugin)] +#![plugin(clippy)] +#![feature(augmented_assignments)] +//! Blockchain sync module +//! Implements ethereum protocol version 63 as specified here: +//! https://github.com/ethereum/wiki/wiki/Ethereum-Wire-Protocol +//! +//! Usage example: +//! +//! ```rust +//! extern crate ethcore_util as util; +//! extern crate ethcore; +//! extern crate ethsync; +//! use std::env; +//! use std::sync::Arc; +//! use util::network::{NetworkService, NetworkConfiguration}; +//! use ethcore::client::Client; +//! use ethsync::EthSync; +//! use ethcore::ethereum; +//! +//! fn main() { +//! let mut service = NetworkService::start(NetworkConfiguration::new()).unwrap(); +//! let dir = env::temp_dir(); +//! let client = Client::new(ethereum::new_frontier(), &dir, service.io().channel()).unwrap(); +//! EthSync::register(&mut service, client); +//! } +//! ``` + +#[macro_use] +extern crate log; +#[macro_use] +extern crate ethcore_util as util; +extern crate ethcore; +extern crate env_logger; + +use std::ops::*; +use std::sync::*; +use ethcore::client::Client; +use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId}; +use chain::ChainSync; +use ethcore::service::SyncMessage; +use io::NetSyncIo; + +mod chain; +mod io; +mod range_collection; + +#[cfg(test)] +mod tests; + +/// Ethereum network protocol handler +pub struct EthSync { + /// Shared blockchain client. TODO: this should evetually become an IPC endpoint + chain: Arc, + /// Sync strategy + sync: RwLock +} + +pub use self::chain::SyncStatus; + +impl EthSync { + /// Creates and register protocol with the network service + pub fn register(service: &mut NetworkService, chain: Arc) -> Arc { + let sync = Arc::new(EthSync { + chain: chain, + sync: RwLock::new(ChainSync::new()), + }); + service.register_protocol(sync.clone(), "eth", &[62u8, 63u8]).expect("Error registering eth protocol handler"); + sync + } + + /// Get sync status + pub fn status(&self) -> SyncStatus { + self.sync.read().unwrap().status() + } + + /// Stop sync + pub fn stop(&mut self, io: &mut NetworkContext) { + self.sync.write().unwrap().abort(&mut NetSyncIo::new(io, self.chain.deref())); + } + + /// Restart sync + pub fn restart(&mut self, io: &mut NetworkContext) { + self.sync.write().unwrap().restart(&mut NetSyncIo::new(io, self.chain.deref())); + } +} + +impl NetworkProtocolHandler for EthSync { + fn initialize(&self, _io: &NetworkContext) { + } + + fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { + self.sync.write().unwrap().on_packet(&mut NetSyncIo::new(io, self.chain.deref()) , *peer, packet_id, data); + } + + fn connected(&self, io: &NetworkContext, peer: &PeerId) { + self.sync.write().unwrap().on_peer_connected(&mut NetSyncIo::new(io, self.chain.deref()), *peer); + } + + fn disconnected(&self, io: &NetworkContext, peer: &PeerId) { + self.sync.write().unwrap().on_peer_aborting(&mut NetSyncIo::new(io, self.chain.deref()), *peer); + } +} + + diff --git a/sync/src/range_collection.rs b/sync/src/range_collection.rs new file mode 100644 index 000000000..b8186e5a5 --- /dev/null +++ b/sync/src/range_collection.rs @@ -0,0 +1,260 @@ +/// This module defines a trait for a collection of ranged values and an implementation +/// for this trait over sorted vector. + +use std::ops::{Add, Sub, Range}; + +pub trait ToUsize { + fn to_usize(&self) -> usize; +} + +pub trait FromUsize { + fn from_usize(s: usize) -> Self; +} + +/// A key-value collection orderd by key with sequential key-value pairs grouped together. +/// Such group is called a range. +/// E.g. a set of collection of 5 pairs {1, a}, {2, b}, {10, x}, {11, y}, {12, z} will be grouped into two ranges: {1, [a,b]}, {10, [x,y,z]} +pub trait RangeCollection { + /// Check if the given key is present in the collection. + fn have_item(&self, key: &K) -> bool; + /// Get value by key. + fn find_item(&self, key: &K) -> Option<&V>; + /// Get a range of keys from `key` till the end of the range that has `key` + /// Returns an empty range is key does not exist. + fn get_tail(&mut self, key: &K) -> Range; + /// Remove all elements < `start` in the range that contains `start` - 1 + fn remove_head(&mut self, start: &K); + /// Remove all elements >= `start` in the range that contains `start` + fn remove_tail(&mut self, start: &K); + /// Remove all elements >= `tail` + fn insert_item(&mut self, key: K, value: V); + /// Get an iterator over ranges + fn range_iter(& self) -> RangeIterator; +} + +/// Range iterator. For each range yelds a key for the first element of the range and a vector of values. +pub struct RangeIterator<'c, K:'c, V:'c> { + range: usize, + collection: &'c Vec<(K, Vec)> +} + +impl<'c, K:'c, V:'c> Iterator for RangeIterator<'c, K, V> where K: Add + FromUsize + ToUsize + Copy { + type Item = (K, &'c [V]); + // The 'Iterator' trait only requires the 'next' method to be defined. The + // return type is 'Option', 'None' is returned when the 'Iterator' is + // over, otherwise the next value is returned wrapped in 'Some' + fn next(&mut self) -> Option<(K, &'c [V])> { + if self.range > 0 { + self.range -= 1; + } + else { + return None; + } + match self.collection.get(self.range) { + Some(&(ref k, ref vec)) => { + Some((*k, &vec)) + }, + None => None + } + } +} + +impl RangeCollection for Vec<(K, Vec)> where K: Ord + PartialEq + Add + Sub + Copy + FromUsize + ToUsize { + fn range_iter(&self) -> RangeIterator { + RangeIterator { + range: self.len(), + collection: self + } + } + + fn have_item(&self, key: &K) -> bool { + match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { + Ok(_) => true, + Err(index) => match self.get(index) { + Some(&(ref k, ref v)) => k <= key && (*k + FromUsize::from_usize(v.len())) > *key, + _ => false + }, + } + } + + fn find_item(&self, key: &K) -> Option<&V> { + match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { + Ok(index) => self.get(index).unwrap().1.get(0), + Err(index) => match self.get(index) { + Some(&(ref k, ref v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => v.get((*key - *k).to_usize()), + _ => None + }, + } + } + + fn get_tail(&mut self, key: &K) -> Range { + let kv = *key; + match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { + Ok(index) => kv..(kv + FromUsize::from_usize(self[index].1.len())), + Err(index) => { + match self.get_mut(index) { + Some(&mut (ref k, ref mut v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => { + kv..(*k + FromUsize::from_usize(v.len())) + } + _ => kv..kv + } + }, + } + } + /// Remove element key and following elements in the same range + fn remove_tail(&mut self, key: &K) { + match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { + Ok(index) => { self.remove(index); }, + Err(index) =>{ + let mut empty = false; + match self.get_mut(index) { + Some(&mut (ref k, ref mut v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => { + v.truncate((*key - *k).to_usize()); + empty = v.is_empty(); + } + _ => {} + } + if empty { + self.remove(index); + } + }, + } + } + + /// Remove range elements up to key + fn remove_head(&mut self, key: &K) { + if *key == FromUsize::from_usize(0) { + return + } + + let prev = *key - FromUsize::from_usize(1); + match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { + Ok(_) => { }, //start of range, do nothing. + Err(index) => { + let mut empty = false; + match self.get_mut(index) { + Some(&mut (ref mut k, ref mut v)) if *k <= prev && (*k + FromUsize::from_usize(v.len())) > prev => { + let tail = v.split_off((*key - *k).to_usize()); + empty = tail.is_empty(); + let removed = ::std::mem::replace(v, tail); + let new_k = *k + FromUsize::from_usize(removed.len()); + ::std::mem::replace(k, new_k); + } + _ => {} + } + if empty { + self.remove(index); + } + }, + } + } + + fn insert_item(&mut self, key: K, value: V) { + assert!(!self.have_item(&key)); + + let lower = match self.binary_search_by(|&(k, _)| k.cmp(&key).reverse()) { + Ok(index) => index, + Err(index) => index, + }; + + let mut to_remove: Option = None; + if lower < self.len() && self[lower].0 + FromUsize::from_usize(self[lower].1.len()) == key { + // extend into existing chunk + self[lower].1.push(value); + } + else { + // insert a new chunk + let range: Vec = vec![value]; + self.insert(lower, (key, range)); + }; + if lower > 0 { + let next = lower - 1; + if next < self.len() + { + { + let (mut next, mut inserted) = self.split_at_mut(lower); + let mut next = next.last_mut().unwrap(); + let mut inserted = inserted.first_mut().unwrap(); + if next.0 == key + FromUsize::from_usize(1) + { + inserted.1.append(&mut next.1); + to_remove = Some(lower - 1); + } + } + + if let Some(r) = to_remove { + self.remove(r); + } + } + } + } +} + +#[test] +#[allow(cyclomatic_complexity)] +fn test_range() { + use std::cmp::{Ordering}; + + let mut ranges: Vec<(u64, Vec)> = Vec::new(); + assert_eq!(ranges.range_iter().next(), None); + assert_eq!(ranges.find_item(&1), None); + assert!(!ranges.have_item(&1)); + assert_eq!(ranges.get_tail(&0), 0..0); + + ranges.insert_item(17, 'q'); + assert_eq!(ranges.range_iter().cmp(vec![(17, &['q'][..])]), Ordering::Equal); + assert_eq!(ranges.find_item(&17), Some(&'q')); + assert!(ranges.have_item(&17)); + assert_eq!(ranges.get_tail(&17), 17..18); + + ranges.insert_item(18, 'r'); + assert_eq!(ranges.range_iter().cmp(vec![(17, &['q', 'r'][..])]), Ordering::Equal); + assert_eq!(ranges.find_item(&18), Some(&'r')); + assert!(ranges.have_item(&18)); + assert_eq!(ranges.get_tail(&17), 17..19); + + ranges.insert_item(16, 'p'); + assert_eq!(ranges.range_iter().cmp(vec![(16, &['p', 'q', 'r'][..])]), Ordering::Equal); + assert_eq!(ranges.find_item(&16), Some(&'p')); + assert_eq!(ranges.find_item(&17), Some(&'q')); + assert_eq!(ranges.find_item(&18), Some(&'r')); + assert!(ranges.have_item(&16)); + assert_eq!(ranges.get_tail(&17), 17..19); + assert_eq!(ranges.get_tail(&16), 16..19); + + ranges.insert_item(2, 'b'); + assert_eq!(ranges.range_iter().cmp(vec![(2, &['b'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); + assert_eq!(ranges.find_item(&2), Some(&'b')); + + ranges.insert_item(3, 'c'); + ranges.insert_item(4, 'd'); + assert_eq!(ranges.get_tail(&3), 3..5); + assert_eq!(ranges.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); + + let mut r = ranges.clone(); + r.remove_head(&1); + assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); + r.remove_head(&2); + assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); + r.remove_head(&3); + assert_eq!(r.range_iter().cmp(vec![(3, &['c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); + r.remove_head(&10); + assert_eq!(r.range_iter().cmp(vec![(3, &['c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); + r.remove_head(&5); + assert_eq!(r.range_iter().cmp(vec![(16, &['p', 'q', 'r'][..])]), Ordering::Equal); + r.remove_head(&19); + assert_eq!(r.range_iter().next(), None); + + let mut r = ranges.clone(); + r.remove_tail(&20); + assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); + r.remove_tail(&17); + assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p'][..])]), Ordering::Equal); + r.remove_tail(&16); + assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..])]), Ordering::Equal); + r.remove_tail(&3); + assert_eq!(r.range_iter().cmp(vec![(2, &['b'][..])]), Ordering::Equal); + r.remove_tail(&2); + assert_eq!(r.range_iter().next(), None); +} + diff --git a/sync/src/service.rs b/sync/src/service.rs new file mode 100644 index 000000000..8c900d20a --- /dev/null +++ b/sync/src/service.rs @@ -0,0 +1,104 @@ +use util::*; +use sync::*; +use spec::Spec; +use error::*; +use std::env; +use client::Client; + +/// Message type for external and internal events +#[derive(Clone)] +pub enum SyncMessage { + /// New block has been imported into the blockchain + NewChainBlock(Bytes), //TODO: use Cow + /// A block is ready + BlockVerified, +} + +/// TODO [arkpar] Please document me +pub type NetSyncMessage = NetworkIoMessage; + +/// Client service setup. Creates and registers client and network services with the IO subsystem. +pub struct ClientService { + net_service: NetworkService, + client: Arc, + sync: Arc, +} + +impl ClientService { + /// Start the service in a separate thread. + pub fn start(spec: Spec, net_config: NetworkConfiguration) -> Result { + let mut net_service = try!(NetworkService::start(net_config)); + info!("Starting {}", net_service.host_info()); + info!("Configured for {} using {} engine", spec.name, spec.engine_name); + let mut dir = env::home_dir().unwrap(); + dir.push(".parity"); + dir.push(H64::from(spec.genesis_header().hash()).hex()); + let client = try!(Client::new(spec, &dir, net_service.io().channel())); + let sync = EthSync::register(&mut net_service, client.clone()); + let client_io = Arc::new(ClientIoHandler { + client: client.clone() + }); + try!(net_service.io().register_handler(client_io)); + + Ok(ClientService { + net_service: net_service, + client: client, + sync: sync, + }) + } + + /// Get the network service. + pub fn add_node(&mut self, _enode: &str) { + unimplemented!(); + } + + /// TODO [arkpar] Please document me + pub fn io(&mut self) -> &mut IoService { + self.net_service.io() + } + + /// TODO [arkpar] Please document me + pub fn client(&self) -> Arc { + self.client.clone() + + } + + /// Get shared sync handler + pub fn sync(&self) -> Arc { + self.sync.clone() + } +} + +/// IO interface for the Client handler +struct ClientIoHandler { + client: Arc +} + +const CLIENT_TICK_TIMER: TimerToken = 0; +const CLIENT_TICK_MS: u64 = 5000; + +impl IoHandler for ClientIoHandler { + fn initialize(&self, io: &IoContext) { + io.register_timer(CLIENT_TICK_TIMER, CLIENT_TICK_MS).expect("Error registering client timer"); + } + + fn timeout(&self, _io: &IoContext, timer: TimerToken) { + if timer == CLIENT_TICK_TIMER { + self.client.tick(); + } + } + + #[allow(match_ref_pats)] + #[allow(single_match)] + fn message(&self, io: &IoContext, net_message: &NetSyncMessage) { + if let &UserMessage(ref message) = net_message { + match message { + &SyncMessage::BlockVerified => { + self.client.import_verified_blocks(&io.channel()); + }, + _ => {}, // ignore other messages + } + } + } +} + diff --git a/sync/src/tests.rs b/sync/src/tests.rs new file mode 100644 index 000000000..070ae566d --- /dev/null +++ b/sync/src/tests.rs @@ -0,0 +1,349 @@ +use util::*; +use ethcore::client::{BlockChainClient, BlockStatus, TreeRoute, BlockChainInfo}; +use ethcore::block_queue::BlockQueueInfo; +use ethcore::header::{Header as BlockHeader, BlockNumber}; +use ethcore::error::*; +use io::SyncIo; +use chain::ChainSync; + +struct TestBlockChainClient { + blocks: RwLock>, + numbers: RwLock>, + genesis_hash: H256, + last_hash: RwLock, + difficulty: RwLock, +} + +impl TestBlockChainClient { + fn new() -> TestBlockChainClient { + + let mut client = TestBlockChainClient { + blocks: RwLock::new(HashMap::new()), + numbers: RwLock::new(HashMap::new()), + genesis_hash: H256::new(), + last_hash: RwLock::new(H256::new()), + difficulty: RwLock::new(From::from(0)), + }; + client.add_blocks(1, true); // add genesis block + client.genesis_hash = client.last_hash.read().unwrap().clone(); + client + } + + pub fn add_blocks(&mut self, count: usize, empty: bool) { + let len = self.numbers.read().unwrap().len(); + for n in len..(len + count) { + let mut header = BlockHeader::new(); + header.difficulty = From::from(n); + header.parent_hash = self.last_hash.read().unwrap().clone(); + header.number = n as BlockNumber; + let mut uncles = RlpStream::new_list(if empty {0} else {1}); + if !empty { + uncles.append(&H256::from(&U256::from(n))); + header.uncles_hash = uncles.as_raw().sha3(); + } + let mut rlp = RlpStream::new_list(3); + rlp.append(&header); + rlp.append_raw(&rlp::NULL_RLP, 1); + rlp.append_raw(uncles.as_raw(), 1); + self.import_block(rlp.as_raw().to_vec()).unwrap(); + } + } +} + +impl BlockChainClient for TestBlockChainClient { + fn block_total_difficulty(&self, _h: &H256) -> Option { + unimplemented!(); + } + + fn block_header(&self, h: &H256) -> Option { + self.blocks.read().unwrap().get(h).map(|r| Rlp::new(r).at(0).as_raw().to_vec()) + + } + + fn block_body(&self, h: &H256) -> Option { + self.blocks.read().unwrap().get(h).map(|r| { + let mut stream = RlpStream::new_list(2); + stream.append_raw(Rlp::new(&r).at(1).as_raw(), 1); + stream.append_raw(Rlp::new(&r).at(2).as_raw(), 1); + stream.out() + }) + } + + fn block(&self, h: &H256) -> Option { + self.blocks.read().unwrap().get(h).cloned() + } + + fn block_status(&self, h: &H256) -> BlockStatus { + match self.blocks.read().unwrap().get(h) { + Some(_) => BlockStatus::InChain, + None => BlockStatus::Unknown + } + } + + fn block_total_difficulty_at(&self, _number: BlockNumber) -> Option { + unimplemented!(); + } + + fn block_header_at(&self, n: BlockNumber) -> Option { + self.numbers.read().unwrap().get(&(n as usize)).and_then(|h| self.block_header(h)) + } + + fn block_body_at(&self, n: BlockNumber) -> Option { + self.numbers.read().unwrap().get(&(n as usize)).and_then(|h| self.block_body(h)) + } + + fn block_at(&self, n: BlockNumber) -> Option { + self.numbers.read().unwrap().get(&(n as usize)).map(|h| self.blocks.read().unwrap().get(h).unwrap().clone()) + } + + fn block_status_at(&self, n: BlockNumber) -> BlockStatus { + if (n as usize) < self.blocks.read().unwrap().len() { + BlockStatus::InChain + } else { + BlockStatus::Unknown + } + } + + fn tree_route(&self, _from: &H256, _to: &H256) -> Option { + Some(TreeRoute { + blocks: Vec::new(), + ancestor: H256::new(), + index: 0 + }) + } + + fn state_data(&self, _h: &H256) -> Option { + None + } + + fn block_receipts(&self, _h: &H256) -> Option { + None + } + + fn import_block(&self, b: Bytes) -> ImportResult { + let header = Rlp::new(&b).val_at::(0); + let h = header.hash(); + let number: usize = header.number as usize; + if number > self.blocks.read().unwrap().len() { + panic!("Unexpected block number. Expected {}, got {}", self.blocks.read().unwrap().len(), number); + } + if number > 0 { + match self.blocks.read().unwrap().get(&header.parent_hash) { + Some(parent) => { + let parent = Rlp::new(parent).val_at::(0); + if parent.number != (header.number - 1) { + panic!("Unexpected block parent"); + } + }, + None => { + panic!("Unknown block parent {:?} for block {}", header.parent_hash, number); + } + } + } + let len = self.numbers.read().unwrap().len(); + if number == len { + *self.difficulty.write().unwrap().deref_mut() += header.difficulty; + mem::replace(self.last_hash.write().unwrap().deref_mut(), h.clone()); + self.blocks.write().unwrap().insert(h.clone(), b); + self.numbers.write().unwrap().insert(number, h.clone()); + let mut parent_hash = header.parent_hash; + if number > 0 { + let mut n = number - 1; + while n > 0 && self.numbers.read().unwrap()[&n] != parent_hash { + *self.numbers.write().unwrap().get_mut(&n).unwrap() = parent_hash.clone(); + n -= 1; + parent_hash = Rlp::new(&self.blocks.read().unwrap()[&parent_hash]).val_at::(0).parent_hash; + } + } + } + else { + self.blocks.write().unwrap().insert(h.clone(), b.to_vec()); + } + Ok(h) + } + + fn queue_info(&self) -> BlockQueueInfo { + BlockQueueInfo { + full: false, + verified_queue_size: 0, + unverified_queue_size: 0, + verifying_queue_size: 0, + } + } + + fn clear_queue(&self) { + } + + fn chain_info(&self) -> BlockChainInfo { + BlockChainInfo { + total_difficulty: *self.difficulty.read().unwrap(), + pending_total_difficulty: *self.difficulty.read().unwrap(), + genesis_hash: self.genesis_hash.clone(), + best_block_hash: self.last_hash.read().unwrap().clone(), + best_block_number: self.blocks.read().unwrap().len() as BlockNumber - 1, + } + } +} + +struct TestIo<'p> { + chain: &'p mut TestBlockChainClient, + queue: &'p mut VecDeque, + sender: Option, +} + +impl<'p> TestIo<'p> { + fn new(chain: &'p mut TestBlockChainClient, queue: &'p mut VecDeque, sender: Option) -> TestIo<'p> { + TestIo { + chain: chain, + queue: queue, + sender: sender + } + } +} + +impl<'p> SyncIo for TestIo<'p> { + fn disable_peer(&mut self, _peer_id: PeerId) { + } + + fn respond(&mut self, packet_id: PacketId, data: Vec) -> Result<(), UtilError> { + self.queue.push_back(TestPacket { + data: data, + packet_id: packet_id, + recipient: self.sender.unwrap() + }); + Ok(()) + } + + fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), UtilError> { + self.queue.push_back(TestPacket { + data: data, + packet_id: packet_id, + recipient: peer_id, + }); + Ok(()) + } + + fn chain(&self) -> &BlockChainClient { + self.chain + } +} + +struct TestPacket { + data: Bytes, + packet_id: PacketId, + recipient: PeerId, +} + +struct TestPeer { + chain: TestBlockChainClient, + sync: ChainSync, + queue: VecDeque, +} + +struct TestNet { + peers: Vec +} + +impl TestNet { + pub fn new(n: usize) -> TestNet { + let mut net = TestNet { + peers: Vec::new(), + }; + for _ in 0..n { + net.peers.push(TestPeer { + chain: TestBlockChainClient::new(), + sync: ChainSync::new(), + queue: VecDeque::new(), + }); + } + net + } + + pub fn peer(&self, i: usize) -> &TestPeer { + self.peers.get(i).unwrap() + } + + pub fn peer_mut(&mut self, i: usize) -> &mut TestPeer { + self.peers.get_mut(i).unwrap() + } + + pub fn start(&mut self) { + for peer in 0..self.peers.len() { + for client in 0..self.peers.len() { + if peer != client { + let mut p = self.peers.get_mut(peer).unwrap(); + p.sync.on_peer_connected(&mut TestIo::new(&mut p.chain, &mut p.queue, Some(client as PeerId)), client as PeerId); + } + } + } + } + + pub fn sync_step(&mut self) { + for peer in 0..self.peers.len() { + if let Some(packet) = self.peers[peer].queue.pop_front() { + let mut p = self.peers.get_mut(packet.recipient).unwrap(); + trace!("--- {} -> {} ---", peer, packet.recipient); + p.sync.on_packet(&mut TestIo::new(&mut p.chain, &mut p.queue, Some(peer as PeerId)), peer as PeerId, packet.packet_id, &packet.data); + trace!("----------------"); + } + let mut p = self.peers.get_mut(peer).unwrap(); + p.sync._maintain_sync(&mut TestIo::new(&mut p.chain, &mut p.queue, None)); + } + } + + pub fn sync(&mut self) { + self.start(); + while !self.done() { + self.sync_step() + } + } + + pub fn done(&self) -> bool { + self.peers.iter().all(|p| p.queue.is_empty()) + } +} + + +#[test] +fn full_sync_two_peers() { + ::env_logger::init().ok(); + let mut net = TestNet::new(3); + net.peer_mut(1).chain.add_blocks(1000, false); + net.peer_mut(2).chain.add_blocks(1000, false); + net.sync(); + assert!(net.peer(0).chain.block_at(1000).is_some()); + assert_eq!(net.peer(0).chain.blocks.read().unwrap().deref(), net.peer(1).chain.blocks.read().unwrap().deref()); +} + +#[test] +fn full_sync_empty_blocks() { + ::env_logger::init().ok(); + let mut net = TestNet::new(3); + for n in 0..200 { + net.peer_mut(1).chain.add_blocks(5, n % 2 == 0); + net.peer_mut(2).chain.add_blocks(5, n % 2 == 0); + } + net.sync(); + assert!(net.peer(0).chain.block_at(1000).is_some()); + assert_eq!(net.peer(0).chain.blocks.read().unwrap().deref(), net.peer(1).chain.blocks.read().unwrap().deref()); +} + +#[test] +fn forked_sync() { + ::env_logger::init().ok(); + let mut net = TestNet::new(3); + net.peer_mut(0).chain.add_blocks(300, false); + net.peer_mut(1).chain.add_blocks(300, false); + net.peer_mut(2).chain.add_blocks(300, false); + net.peer_mut(0).chain.add_blocks(100, true); //fork + net.peer_mut(1).chain.add_blocks(200, false); + net.peer_mut(2).chain.add_blocks(200, false); + net.peer_mut(1).chain.add_blocks(100, false); //fork between 1 and 2 + net.peer_mut(2).chain.add_blocks(10, true); + // peer 1 has the best chain of 601 blocks + let peer1_chain = net.peer(1).chain.numbers.read().unwrap().clone(); + net.sync(); + assert_eq!(net.peer(0).chain.numbers.read().unwrap().deref(), &peer1_chain); + assert_eq!(net.peer(1).chain.numbers.read().unwrap().deref(), &peer1_chain); + assert_eq!(net.peer(2).chain.numbers.read().unwrap().deref(), &peer1_chain); +} diff --git a/src/sync/tests.rs b/sync/tests.rs similarity index 100% rename from src/sync/tests.rs rename to sync/tests.rs