diff --git a/logger/src/lib.rs b/logger/src/lib.rs index a72ef8755..521c3a2d7 100644 --- a/logger/src/lib.rs +++ b/logger/src/lib.rs @@ -30,7 +30,7 @@ use std::env; use std::sync::Arc; use std::fs::File; use std::io::Write; -use isatty::{stderr_isatty}; +use isatty::{stderr_isatty, stdout_isatty}; use env_logger::LogBuilder; use regex::Regex; use util::RotatingLogger; @@ -89,7 +89,8 @@ pub fn setup_log(settings: &Settings) -> Arc { builder.parse(s); } - let enable_color = settings.color && stderr_isatty(); + let isatty = stderr_isatty(); + let enable_color = settings.color && isatty; let logs = Arc::new(RotatingLogger::new(levels)); let logger = logs.clone(); let maybe_file = settings.file.as_ref().map(|f| File::create(f).unwrap_or_else(|_| panic!("Cannot write to log file given: {}", f))); @@ -115,6 +116,10 @@ pub fn setup_log(settings: &Settings) -> Arc { let _ = file.write_all(b"\n"); } logger.append(removed_color); + if !isatty && record.level() <= LogLevel::Info && stdout_isatty() { + // duplicate INFO/WARN output to console + println!("{}", ret); + } ret }; diff --git a/parity/informant.rs b/parity/informant.rs index 2568ca50c..6e45e1e8d 100644 --- a/parity/informant.rs +++ b/parity/informant.rs @@ -106,7 +106,7 @@ impl Informant { false => t, }; - info!("{} {} {}", + info!(target: "import", "{} {} {}", match importing { true => format!("{} {} {} {}+{} Qed", paint(White.bold(), format!("{:>8}", format!("#{}", chain_info.best_block_number))), diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 28c42cd6a..8a2e75383 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -119,6 +119,7 @@ const SUBCHAIN_SIZE: usize = 64; const MAX_ROUND_PARENTS: usize = 32; const MAX_NEW_HASHES: usize = 64; const MAX_TX_TO_IMPORT: usize = 512; +const MAX_NEW_BLOCK_AGE: BlockNumber = 20; const STATUS_PACKET: u8 = 0x00; const NEW_BLOCK_HASHES_PACKET: u8 = 0x01; @@ -134,7 +135,8 @@ const NODE_DATA_PACKET: u8 = 0x0e; const GET_RECEIPTS_PACKET: u8 = 0x0f; const RECEIPTS_PACKET: u8 = 0x10; -const CONNECTION_TIMEOUT_SEC: f64 = 15f64; +const HEADERS_TIMEOUT_SEC: f64 = 15f64; +const BODIES_TIMEOUT_SEC: f64 = 5f64; #[derive(Copy, Clone, Eq, PartialEq, Debug)] /// Sync state @@ -217,6 +219,8 @@ struct PeerInfo { asking_hash: Option, /// Request timestamp ask_time: f64, + /// Pending request is expird and result should be ignored + expired: bool, } /// Blockchain sync handler. @@ -311,6 +315,10 @@ impl ChainSync { for (_, ref mut p) in &mut self.peers { p.asking_blocks.clear(); p.asking_hash = None; + // mark any pending requests as expired + if p.asking != PeerAsking::Nothing { + p.expired = true; + } } self.syncing_difficulty = From::from(0u64); self.state = SyncState::Idle; @@ -361,6 +369,7 @@ impl ChainSync { asking_blocks: Vec::new(), asking_hash: None, ask_time: 0f64, + expired: false, }; trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis); @@ -496,6 +505,9 @@ impl ChainSync { } self.collect_blocks(io); + // give a task to the same peer first if received valuable headers. + self.sync_peer(io, peer_id, false); + // give tasks to other peers self.continue_sync(io); Ok(()) } @@ -548,6 +560,11 @@ impl ChainSync { peer.latest_hash = header.hash(); peer.latest_number = Some(header.number()); } + if self.last_imported_block > header.number() && self.last_imported_block - header.number() > MAX_NEW_BLOCK_AGE { + trace!(target: "sync", "Ignored ancient new block {:?}", h); + io.disable_peer(peer_id); + return Ok(()); + } match io.chain().import_block(block_rlp.as_raw().to_vec()) { Err(BlockImportError::Import(ImportError::AlreadyInChain)) => { trace!(target: "sync", "New block already in chain {:?}", h); @@ -605,34 +622,39 @@ impl ChainSync { let hashes = r.iter().take(MAX_NEW_HASHES).map(|item| (item.val_at::(0), item.val_at::(1))); let mut max_height: BlockNumber = 0; let mut new_hashes = Vec::new(); - for (rh, rd) in hashes { - let h = try!(rh); - let d = try!(rd); - if d > self.highest_block.unwrap_or(0) { - self.highest_block = Some(d); + for (rh, rn) in hashes { + let hash = try!(rh); + let number = try!(rn); + if number > self.highest_block.unwrap_or(0) { + self.highest_block = Some(number); } - if self.blocks.is_downloading(&h) { + if self.blocks.is_downloading(&hash) { continue; } - match io.chain().block_status(BlockID::Hash(h.clone())) { + if self.last_imported_block > number && self.last_imported_block - number > MAX_NEW_BLOCK_AGE { + trace!(target: "sync", "Ignored ancient new block hash {:?}", hash); + io.disable_peer(peer_id); + continue; + } + match io.chain().block_status(BlockID::Hash(hash.clone())) { BlockStatus::InChain => { - trace!(target: "sync", "New block hash already in chain {:?}", h); + trace!(target: "sync", "New block hash already in chain {:?}", hash); }, BlockStatus::Queued => { - trace!(target: "sync", "New hash block already queued {:?}", h); + trace!(target: "sync", "New hash block already queued {:?}", hash); }, BlockStatus::Unknown => { - new_hashes.push(h.clone()); - if d > max_height { - trace!(target: "sync", "New unknown block hash {:?}", h); + new_hashes.push(hash.clone()); + if number > max_height { + trace!(target: "sync", "New unknown block hash {:?}", hash); let peer = self.peers.get_mut(&peer_id).unwrap(); - peer.latest_hash = h.clone(); - peer.latest_number = Some(d); - max_height = d; + peer.latest_hash = hash.clone(); + peer.latest_number = Some(number); + max_height = number; } }, BlockStatus::Bad => { - debug!(target: "sync", "Bad new block hash {:?}", h); + debug!(target: "sync", "Bad new block hash {:?}", hash); io.disable_peer(peer_id); return Ok(()); } @@ -678,7 +700,7 @@ impl ChainSync { self.sync_peer(io, p, false); } } - if self.state != SyncState::Waiting && !self.peers.values().any(|p| p.asking != PeerAsking::Nothing) { + if self.state != SyncState::Waiting && !self.peers.values().any(|p| p.asking != PeerAsking::Nothing && !p.expired) { self.complete_sync(); } } @@ -913,6 +935,7 @@ impl ChainSync { /// Reset peer status after request is complete. fn reset_peer_asking(&mut self, peer_id: PeerId, asking: PeerAsking) -> bool { let peer = self.peers.get_mut(&peer_id).unwrap(); + peer.expired = false; if peer.asking != asking { trace!(target:"sync", "Asking {:?} while expected {:?}", peer.asking, asking); peer.asking = PeerAsking::Nothing; @@ -1183,7 +1206,12 @@ impl ChainSync { let tick = time::precise_time_s(); let mut aborting = Vec::new(); for (peer_id, peer) in &self.peers { - if peer.asking != PeerAsking::Nothing && (tick - peer.ask_time) > CONNECTION_TIMEOUT_SEC { + let timeout = match peer.asking { + PeerAsking::BlockHeaders | PeerAsking::Heads => (tick - peer.ask_time) > HEADERS_TIMEOUT_SEC, + PeerAsking::BlockBodies => (tick - peer.ask_time) > BODIES_TIMEOUT_SEC, + PeerAsking::Nothing => false, + }; + if timeout { trace!(target:"sync", "Timeout {}", peer_id); io.disconnect_peer(*peer_id); aborting.push(*peer_id); @@ -1598,6 +1626,7 @@ mod tests { asking_blocks: Vec::new(), asking_hash: None, ask_time: 0f64, + expired: false, }); sync } diff --git a/sync/src/tests/chain.rs b/sync/src/tests/chain.rs index 84e25429d..e15d804e2 100644 --- a/sync/src/tests/chain.rs +++ b/sync/src/tests/chain.rs @@ -108,7 +108,7 @@ fn restart() { net.restart_peer(0); let status = net.peer(0).sync.read().status(); - assert_eq!(status.state, SyncState::ChainHead); + assert_eq!(status.state, SyncState::Idle); } #[test]