diff --git a/ethcore/src/spec.rs b/ethcore/src/spec.rs index 5714ca734..38a0dda53 100644 --- a/ethcore/src/spec.rs +++ b/ethcore/src/spec.rs @@ -58,6 +58,8 @@ pub struct Spec { /// Known nodes on the network in enode format. pub nodes: Vec, + /// Network ID + pub network_id: U256, /// Parameters concerning operation of the specific engine we're using. /// Maps the parameter name to an RLP-encoded value. @@ -120,6 +122,9 @@ impl Spec { /// Get the known knodes of the network in enode format. pub fn nodes(&self) -> &Vec { &self.nodes } + /// Get the configured Network ID. + pub fn network_id(&self) -> U256 { self.network_id } + /// Get the header of the genesis block. pub fn genesis_header(&self) -> Header { Header { @@ -250,6 +255,7 @@ impl FromJson for Spec { engine_name: json["engineName"].as_string().unwrap().to_owned(), engine_params: json_to_rlp_map(&json["params"]), nodes: nodes, + network_id: U256::from_str(&json["params"]["networkID"].as_string().unwrap()[2..]).unwrap(), builtins: builtins, parent_hash: H256::from_str(&genesis["parentHash"].as_string().unwrap()[2..]).unwrap(), author: Address::from_str(&genesis["author"].as_string().unwrap()[2..]).unwrap(), diff --git a/parity/main.rs b/parity/main.rs index 1b6a59a93..e95f38f13 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -48,7 +48,7 @@ use ethcore::client::*; use ethcore::service::{ClientService, NetSyncMessage}; use ethcore::ethereum; use ethcore::blockchain::CacheSize; -use ethsync::EthSync; +use ethsync::{EthSync, SyncConfig}; use docopt::Docopt; use daemonize::Daemonize; @@ -281,6 +281,8 @@ impl Configuration { let spec = self.spec(); let net_settings = self.net_settings(&spec); + let mut sync_config = SyncConfig::default(); + sync_config.network_id = spec.network_id(); // Build client let mut service = ClientService::start(spec, net_settings, &Path::new(&self.path())).unwrap(); @@ -288,7 +290,7 @@ impl Configuration { client.configure_cache(self.args.flag_cache_pref_size, self.args.flag_cache_max_size); // Sync - let sync = EthSync::register(service.network(), client); + let sync = EthSync::register(service.network(), sync_config, client); // Setup rpc if self.args.flag_jsonrpc { diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 5c79e08b6..bea17c177 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -40,6 +40,7 @@ use ethcore::block::Block; use io::SyncIo; use time; use std::option::Option; +use super::SyncConfig; impl ToUsize for BlockNumber { fn to_usize(&self) -> usize { @@ -80,9 +81,7 @@ const NODE_DATA_PACKET: u8 = 0x0e; const GET_RECEIPTS_PACKET: u8 = 0x0f; const RECEIPTS_PACKET: u8 = 0x10; -const NETWORK_ID: U256 = ONE_U256; //TODO: get this from parent - -const CONNECTION_TIMEOUT_SEC: f64 = 10f64; +const CONNECTION_TIMEOUT_SEC: f64 = 5f64; struct Header { /// Header data @@ -203,13 +202,17 @@ pub struct ChainSync { have_common_block: bool, /// Last propagated block number last_send_block_number: BlockNumber, + /// Max blocks to download ahead + max_download_ahead_blocks: usize, + /// Network ID + network_id: U256, } type RlpResponseResult = Result, PacketDecodeError>; impl ChainSync { /// Create a new instance of syncing strategy. - pub fn new() -> ChainSync { + pub fn new(config: SyncConfig) -> ChainSync { ChainSync { state: SyncState::NotSynced, starting_block: 0, @@ -226,6 +229,8 @@ impl ChainSync { syncing_difficulty: U256::from(0u64), have_common_block: false, last_send_block_number: 0, + max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks), + network_id: config.network_id, } } @@ -275,7 +280,6 @@ impl ChainSync { self.starting_block = 0; self.highest_block = None; self.have_common_block = false; - io.chain().clear_queue(); self.starting_block = io.chain().chain_info().best_block_number; self.state = SyncState::NotSynced; } @@ -307,7 +311,7 @@ impl ChainSync { trace!(target: "sync", "Peer {} genesis hash not matched", peer_id); return Ok(()); } - if peer.network_id != NETWORK_ID { + if peer.network_id != self.network_id { io.disable_peer(peer_id); trace!(target: "sync", "Peer {} network id not matched", peer_id); return Ok(()); @@ -436,7 +440,7 @@ impl ChainSync { trace!(target: "sync", "Got body {}", n); } None => { - debug!(target: "sync", "Ignored unknown block body"); + trace!(target: "sync", "Ignored unknown/stale block body"); } } } @@ -608,7 +612,7 @@ impl ChainSync { self.request_headers_by_hash(io, peer_id, &peer_latest, 1, 0, false); } else if self.state == SyncState::Blocks && io.chain().block_status(BlockId::Hash(peer_latest)) == BlockStatus::Unknown { - self.request_blocks(io, peer_id); + self.request_blocks(io, peer_id, false); } } @@ -617,7 +621,7 @@ impl ChainSync { } /// Find some headers or blocks to download for a peer. - fn request_blocks(&mut self, io: &mut SyncIo, peer_id: PeerId) { + fn request_blocks(&mut self, io: &mut SyncIo, peer_id: PeerId, ignore_others: bool) { self.clear_peer_download(peer_id); if io.chain().queue_info().is_full() { @@ -637,28 +641,34 @@ impl ChainSync { let mut index: BlockNumber = 0; while index != items.len() as BlockNumber && needed_bodies.len() < MAX_BODIES_TO_REQUEST { let block = start + index; - if !self.downloading_bodies.contains(&block) && !self.bodies.have_item(&block) { + if ignore_others || (!self.downloading_bodies.contains(&block) && !self.bodies.have_item(&block)) { needed_bodies.push(items[index as usize].hash.clone()); needed_numbers.push(block); - self.downloading_bodies.insert(block); } index += 1; } } } if !needed_bodies.is_empty() { + let (head, _) = self.headers.range_iter().next().unwrap(); + if needed_numbers.first().unwrap() - head > self.max_download_ahead_blocks as BlockNumber { + trace!(target: "sync", "{}: Stalled download ({} vs {}), helping with downloading block bodies", peer_id, needed_numbers.first().unwrap(), head); + self.request_blocks(io, peer_id, true); + return; + } + self.downloading_bodies.extend(needed_numbers.iter()); replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, needed_numbers); self.request_bodies(io, peer_id, needed_bodies); } else { // check if need to download headers - let mut start = 0usize; + let mut start = 0; if !self.have_common_block { // download backwards until common block is found 1 header at a time let chain_info = io.chain().chain_info(); - start = chain_info.best_block_number as usize; + start = chain_info.best_block_number; if !self.headers.is_empty() { - start = min(start, self.headers.range_iter().next().unwrap().0 as usize - 1); + start = min(start, self.headers.range_iter().next().unwrap().0 - 1); } if start == 0 { self.have_common_block = true; //reached genesis @@ -669,6 +679,7 @@ impl ChainSync { if self.have_common_block { let mut headers: Vec = Vec::new(); let mut prev = self.current_base_block() + 1; + let head = self.headers.range_iter().next().map(|(h, _)| h); for (next, ref items) in self.headers.range_iter() { if !headers.is_empty() { break; @@ -679,9 +690,8 @@ impl ChainSync { } let mut block = prev; while block < next && headers.len() < MAX_HEADERS_TO_REQUEST { - if !self.downloading_headers.contains(&(block as BlockNumber)) { + if ignore_others || !self.downloading_headers.contains(&(block as BlockNumber)) { headers.push(block as BlockNumber); - self.downloading_headers.insert(block as BlockNumber); } block += 1; } @@ -689,17 +699,23 @@ impl ChainSync { } if !headers.is_empty() { - start = headers[0] as usize; + start = headers[0]; + if head.is_some() && start > head.unwrap() && start - head.unwrap() > self.max_download_ahead_blocks as BlockNumber { + trace!(target: "sync", "{}: Stalled download ({} vs {}), helping with downloading headers", peer_id, start, head.unwrap()); + self.request_blocks(io, peer_id, true); + return; + } let count = headers.len(); + self.downloading_headers.extend(headers.iter()); replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, headers); - assert!(!self.headers.have_item(&(start as BlockNumber))); - self.request_headers_by_number(io, peer_id, start as BlockNumber, count, 0, false); + assert!(!self.headers.have_item(&start)); + self.request_headers_by_number(io, peer_id, start, count, 0, false); } } else { // continue search for common block - self.downloading_headers.insert(start as BlockNumber); - self.request_headers_by_number(io, peer_id, start as BlockNumber, 1, 0, false); + self.downloading_headers.insert(start); + self.request_headers_by_number(io, peer_id, start, 1, 0, false); } } } @@ -891,7 +907,7 @@ impl ChainSync { let mut packet = RlpStream::new_list(5); let chain = io.chain().chain_info(); packet.append(&(PROTOCOL_VERSION as u32)); - packet.append(&NETWORK_ID); //TODO: network id + packet.append(&self.network_id); packet.append(&chain.total_difficulty); packet.append(&chain.best_block_hash); packet.append(&chain.genesis_hash); @@ -1221,6 +1237,7 @@ impl ChainSync { mod tests { use tests::helpers::*; use super::*; + use ::SyncConfig; use util::*; use super::{PeerInfo, PeerAsking}; use ethcore::header::*; @@ -1334,7 +1351,7 @@ mod tests { } fn dummy_sync_with_peer(peer_latest_hash: H256) -> ChainSync { - let mut sync = ChainSync::new(); + let mut sync = ChainSync::new(SyncConfig::default()); sync.peers.insert(0, PeerInfo { protocol_version: 0, diff --git a/sync/src/lib.rs b/sync/src/lib.rs index fd586409a..397a09f47 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -35,14 +35,14 @@ //! use std::sync::Arc; //! use util::network::{NetworkService, NetworkConfiguration}; //! use ethcore::client::Client; -//! use ethsync::EthSync; +//! use ethsync::{EthSync, SyncConfig}; //! use ethcore::ethereum; //! //! fn main() { //! let mut service = NetworkService::start(NetworkConfiguration::new()).unwrap(); //! let dir = env::temp_dir(); //! let client = Client::new(ethereum::new_frontier(), &dir, service.io().channel()).unwrap(); -//! EthSync::register(&mut service, client); +//! EthSync::register(&mut service, SyncConfig::default(), client); //! } //! ``` @@ -60,6 +60,7 @@ use std::sync::*; use ethcore::client::Client; use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId}; use util::TimerToken; +use util::{U256, ONE_U256}; use chain::ChainSync; use ethcore::service::SyncMessage; use io::NetSyncIo; @@ -71,6 +72,23 @@ mod range_collection; #[cfg(test)] mod tests; +/// Sync configuration +pub struct SyncConfig { + /// Max blocks to download ahead + pub max_download_ahead_blocks: usize, + /// Network ID + pub network_id: U256, +} + +impl Default for SyncConfig { + fn default() -> SyncConfig { + SyncConfig { + max_download_ahead_blocks: 20000, + network_id: ONE_U256, + } + } +} + /// Ethereum network protocol handler pub struct EthSync { /// Shared blockchain client. TODO: this should evetually become an IPC endpoint @@ -83,10 +101,10 @@ pub use self::chain::{SyncStatus, SyncState}; impl EthSync { /// Creates and register protocol with the network service - pub fn register(service: &mut NetworkService, chain: Arc) -> Arc { + pub fn register(service: &mut NetworkService, config: SyncConfig, chain: Arc) -> Arc { let sync = Arc::new(EthSync { chain: chain, - sync: RwLock::new(ChainSync::new()), + sync: RwLock::new(ChainSync::new(config)), }); service.register_protocol(sync.clone(), "eth", &[62u8, 63u8]).expect("Error registering eth protocol handler"); sync diff --git a/sync/src/range_collection.rs b/sync/src/range_collection.rs index c3333ab63..0a1bb6c6f 100644 --- a/sync/src/range_collection.rs +++ b/sync/src/range_collection.rs @@ -207,7 +207,7 @@ impl RangeCollection for Vec<(K, Vec)> where K: Ord + PartialEq + } #[test] -#[allow(cyclomatic_complexity)] +#[cfg_attr(dev, allow(cyclomatic_complexity))] fn test_range() { use std::cmp::{Ordering}; diff --git a/sync/src/tests/helpers.rs b/sync/src/tests/helpers.rs index 8c8b3b10a..6e92184c8 100644 --- a/sync/src/tests/helpers.rs +++ b/sync/src/tests/helpers.rs @@ -20,7 +20,8 @@ use ethcore::block_queue::BlockQueueInfo; use ethcore::header::{Header as BlockHeader, BlockNumber}; use ethcore::error::*; use io::SyncIo; -use chain::{ChainSync}; +use chain::ChainSync; +use ::SyncConfig; use ethcore::receipt::Receipt; use ethcore::transaction::LocalizedTransaction; @@ -330,7 +331,7 @@ impl TestNet { for _ in 0..n { net.peers.push(TestPeer { chain: TestBlockChainClient::new(), - sync: ChainSync::new(), + sync: ChainSync::new(SyncConfig::default()), queue: VecDeque::new(), }); }