Prevent syncing to ancient blocks (#1693)
* Don't try to sync to ancient blocks * Fixed test
This commit is contained in:
parent
9a8fdeead9
commit
247495fba2
@ -30,7 +30,7 @@ use std::env;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
use isatty::{stderr_isatty};
|
use isatty::{stderr_isatty, stdout_isatty};
|
||||||
use env_logger::LogBuilder;
|
use env_logger::LogBuilder;
|
||||||
use regex::Regex;
|
use regex::Regex;
|
||||||
use util::RotatingLogger;
|
use util::RotatingLogger;
|
||||||
@ -89,7 +89,8 @@ pub fn setup_log(settings: &Settings) -> Arc<RotatingLogger> {
|
|||||||
builder.parse(s);
|
builder.parse(s);
|
||||||
}
|
}
|
||||||
|
|
||||||
let enable_color = settings.color && stderr_isatty();
|
let isatty = stderr_isatty();
|
||||||
|
let enable_color = settings.color && isatty;
|
||||||
let logs = Arc::new(RotatingLogger::new(levels));
|
let logs = Arc::new(RotatingLogger::new(levels));
|
||||||
let logger = logs.clone();
|
let logger = logs.clone();
|
||||||
let maybe_file = settings.file.as_ref().map(|f| File::create(f).unwrap_or_else(|_| panic!("Cannot write to log file given: {}", f)));
|
let maybe_file = settings.file.as_ref().map(|f| File::create(f).unwrap_or_else(|_| panic!("Cannot write to log file given: {}", f)));
|
||||||
@ -115,6 +116,10 @@ pub fn setup_log(settings: &Settings) -> Arc<RotatingLogger> {
|
|||||||
let _ = file.write_all(b"\n");
|
let _ = file.write_all(b"\n");
|
||||||
}
|
}
|
||||||
logger.append(removed_color);
|
logger.append(removed_color);
|
||||||
|
if !isatty && record.level() <= LogLevel::Info && stdout_isatty() {
|
||||||
|
// duplicate INFO/WARN output to console
|
||||||
|
println!("{}", ret);
|
||||||
|
}
|
||||||
|
|
||||||
ret
|
ret
|
||||||
};
|
};
|
||||||
|
@ -106,7 +106,7 @@ impl Informant {
|
|||||||
false => t,
|
false => t,
|
||||||
};
|
};
|
||||||
|
|
||||||
info!("{} {} {}",
|
info!(target: "import", "{} {} {}",
|
||||||
match importing {
|
match importing {
|
||||||
true => format!("{} {} {} {}+{} Qed",
|
true => format!("{} {} {} {}+{} Qed",
|
||||||
paint(White.bold(), format!("{:>8}", format!("#{}", chain_info.best_block_number))),
|
paint(White.bold(), format!("{:>8}", format!("#{}", chain_info.best_block_number))),
|
||||||
|
@ -119,6 +119,7 @@ const SUBCHAIN_SIZE: usize = 64;
|
|||||||
const MAX_ROUND_PARENTS: usize = 32;
|
const MAX_ROUND_PARENTS: usize = 32;
|
||||||
const MAX_NEW_HASHES: usize = 64;
|
const MAX_NEW_HASHES: usize = 64;
|
||||||
const MAX_TX_TO_IMPORT: usize = 512;
|
const MAX_TX_TO_IMPORT: usize = 512;
|
||||||
|
const MAX_NEW_BLOCK_AGE: BlockNumber = 20;
|
||||||
|
|
||||||
const STATUS_PACKET: u8 = 0x00;
|
const STATUS_PACKET: u8 = 0x00;
|
||||||
const NEW_BLOCK_HASHES_PACKET: u8 = 0x01;
|
const NEW_BLOCK_HASHES_PACKET: u8 = 0x01;
|
||||||
@ -134,7 +135,8 @@ const NODE_DATA_PACKET: u8 = 0x0e;
|
|||||||
const GET_RECEIPTS_PACKET: u8 = 0x0f;
|
const GET_RECEIPTS_PACKET: u8 = 0x0f;
|
||||||
const RECEIPTS_PACKET: u8 = 0x10;
|
const RECEIPTS_PACKET: u8 = 0x10;
|
||||||
|
|
||||||
const CONNECTION_TIMEOUT_SEC: f64 = 15f64;
|
const HEADERS_TIMEOUT_SEC: f64 = 15f64;
|
||||||
|
const BODIES_TIMEOUT_SEC: f64 = 5f64;
|
||||||
|
|
||||||
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
|
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
|
||||||
/// Sync state
|
/// Sync state
|
||||||
@ -217,6 +219,8 @@ struct PeerInfo {
|
|||||||
asking_hash: Option<H256>,
|
asking_hash: Option<H256>,
|
||||||
/// Request timestamp
|
/// Request timestamp
|
||||||
ask_time: f64,
|
ask_time: f64,
|
||||||
|
/// Pending request is expird and result should be ignored
|
||||||
|
expired: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Blockchain sync handler.
|
/// Blockchain sync handler.
|
||||||
@ -311,6 +315,10 @@ impl ChainSync {
|
|||||||
for (_, ref mut p) in &mut self.peers {
|
for (_, ref mut p) in &mut self.peers {
|
||||||
p.asking_blocks.clear();
|
p.asking_blocks.clear();
|
||||||
p.asking_hash = None;
|
p.asking_hash = None;
|
||||||
|
// mark any pending requests as expired
|
||||||
|
if p.asking != PeerAsking::Nothing {
|
||||||
|
p.expired = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
self.syncing_difficulty = From::from(0u64);
|
self.syncing_difficulty = From::from(0u64);
|
||||||
self.state = SyncState::Idle;
|
self.state = SyncState::Idle;
|
||||||
@ -361,6 +369,7 @@ impl ChainSync {
|
|||||||
asking_blocks: Vec::new(),
|
asking_blocks: Vec::new(),
|
||||||
asking_hash: None,
|
asking_hash: None,
|
||||||
ask_time: 0f64,
|
ask_time: 0f64,
|
||||||
|
expired: false,
|
||||||
};
|
};
|
||||||
|
|
||||||
trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis);
|
trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis);
|
||||||
@ -496,6 +505,9 @@ impl ChainSync {
|
|||||||
}
|
}
|
||||||
|
|
||||||
self.collect_blocks(io);
|
self.collect_blocks(io);
|
||||||
|
// give a task to the same peer first if received valuable headers.
|
||||||
|
self.sync_peer(io, peer_id, false);
|
||||||
|
// give tasks to other peers
|
||||||
self.continue_sync(io);
|
self.continue_sync(io);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -548,6 +560,11 @@ impl ChainSync {
|
|||||||
peer.latest_hash = header.hash();
|
peer.latest_hash = header.hash();
|
||||||
peer.latest_number = Some(header.number());
|
peer.latest_number = Some(header.number());
|
||||||
}
|
}
|
||||||
|
if self.last_imported_block > header.number() && self.last_imported_block - header.number() > MAX_NEW_BLOCK_AGE {
|
||||||
|
trace!(target: "sync", "Ignored ancient new block {:?}", h);
|
||||||
|
io.disable_peer(peer_id);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
match io.chain().import_block(block_rlp.as_raw().to_vec()) {
|
match io.chain().import_block(block_rlp.as_raw().to_vec()) {
|
||||||
Err(BlockImportError::Import(ImportError::AlreadyInChain)) => {
|
Err(BlockImportError::Import(ImportError::AlreadyInChain)) => {
|
||||||
trace!(target: "sync", "New block already in chain {:?}", h);
|
trace!(target: "sync", "New block already in chain {:?}", h);
|
||||||
@ -605,34 +622,39 @@ impl ChainSync {
|
|||||||
let hashes = r.iter().take(MAX_NEW_HASHES).map(|item| (item.val_at::<H256>(0), item.val_at::<BlockNumber>(1)));
|
let hashes = r.iter().take(MAX_NEW_HASHES).map(|item| (item.val_at::<H256>(0), item.val_at::<BlockNumber>(1)));
|
||||||
let mut max_height: BlockNumber = 0;
|
let mut max_height: BlockNumber = 0;
|
||||||
let mut new_hashes = Vec::new();
|
let mut new_hashes = Vec::new();
|
||||||
for (rh, rd) in hashes {
|
for (rh, rn) in hashes {
|
||||||
let h = try!(rh);
|
let hash = try!(rh);
|
||||||
let d = try!(rd);
|
let number = try!(rn);
|
||||||
if d > self.highest_block.unwrap_or(0) {
|
if number > self.highest_block.unwrap_or(0) {
|
||||||
self.highest_block = Some(d);
|
self.highest_block = Some(number);
|
||||||
}
|
}
|
||||||
if self.blocks.is_downloading(&h) {
|
if self.blocks.is_downloading(&hash) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
match io.chain().block_status(BlockID::Hash(h.clone())) {
|
if self.last_imported_block > number && self.last_imported_block - number > MAX_NEW_BLOCK_AGE {
|
||||||
|
trace!(target: "sync", "Ignored ancient new block hash {:?}", hash);
|
||||||
|
io.disable_peer(peer_id);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
match io.chain().block_status(BlockID::Hash(hash.clone())) {
|
||||||
BlockStatus::InChain => {
|
BlockStatus::InChain => {
|
||||||
trace!(target: "sync", "New block hash already in chain {:?}", h);
|
trace!(target: "sync", "New block hash already in chain {:?}", hash);
|
||||||
},
|
},
|
||||||
BlockStatus::Queued => {
|
BlockStatus::Queued => {
|
||||||
trace!(target: "sync", "New hash block already queued {:?}", h);
|
trace!(target: "sync", "New hash block already queued {:?}", hash);
|
||||||
},
|
},
|
||||||
BlockStatus::Unknown => {
|
BlockStatus::Unknown => {
|
||||||
new_hashes.push(h.clone());
|
new_hashes.push(hash.clone());
|
||||||
if d > max_height {
|
if number > max_height {
|
||||||
trace!(target: "sync", "New unknown block hash {:?}", h);
|
trace!(target: "sync", "New unknown block hash {:?}", hash);
|
||||||
let peer = self.peers.get_mut(&peer_id).unwrap();
|
let peer = self.peers.get_mut(&peer_id).unwrap();
|
||||||
peer.latest_hash = h.clone();
|
peer.latest_hash = hash.clone();
|
||||||
peer.latest_number = Some(d);
|
peer.latest_number = Some(number);
|
||||||
max_height = d;
|
max_height = number;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
BlockStatus::Bad => {
|
BlockStatus::Bad => {
|
||||||
debug!(target: "sync", "Bad new block hash {:?}", h);
|
debug!(target: "sync", "Bad new block hash {:?}", hash);
|
||||||
io.disable_peer(peer_id);
|
io.disable_peer(peer_id);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
@ -678,7 +700,7 @@ impl ChainSync {
|
|||||||
self.sync_peer(io, p, false);
|
self.sync_peer(io, p, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if self.state != SyncState::Waiting && !self.peers.values().any(|p| p.asking != PeerAsking::Nothing) {
|
if self.state != SyncState::Waiting && !self.peers.values().any(|p| p.asking != PeerAsking::Nothing && !p.expired) {
|
||||||
self.complete_sync();
|
self.complete_sync();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -913,6 +935,7 @@ impl ChainSync {
|
|||||||
/// Reset peer status after request is complete.
|
/// Reset peer status after request is complete.
|
||||||
fn reset_peer_asking(&mut self, peer_id: PeerId, asking: PeerAsking) -> bool {
|
fn reset_peer_asking(&mut self, peer_id: PeerId, asking: PeerAsking) -> bool {
|
||||||
let peer = self.peers.get_mut(&peer_id).unwrap();
|
let peer = self.peers.get_mut(&peer_id).unwrap();
|
||||||
|
peer.expired = false;
|
||||||
if peer.asking != asking {
|
if peer.asking != asking {
|
||||||
trace!(target:"sync", "Asking {:?} while expected {:?}", peer.asking, asking);
|
trace!(target:"sync", "Asking {:?} while expected {:?}", peer.asking, asking);
|
||||||
peer.asking = PeerAsking::Nothing;
|
peer.asking = PeerAsking::Nothing;
|
||||||
@ -1183,7 +1206,12 @@ impl ChainSync {
|
|||||||
let tick = time::precise_time_s();
|
let tick = time::precise_time_s();
|
||||||
let mut aborting = Vec::new();
|
let mut aborting = Vec::new();
|
||||||
for (peer_id, peer) in &self.peers {
|
for (peer_id, peer) in &self.peers {
|
||||||
if peer.asking != PeerAsking::Nothing && (tick - peer.ask_time) > CONNECTION_TIMEOUT_SEC {
|
let timeout = match peer.asking {
|
||||||
|
PeerAsking::BlockHeaders | PeerAsking::Heads => (tick - peer.ask_time) > HEADERS_TIMEOUT_SEC,
|
||||||
|
PeerAsking::BlockBodies => (tick - peer.ask_time) > BODIES_TIMEOUT_SEC,
|
||||||
|
PeerAsking::Nothing => false,
|
||||||
|
};
|
||||||
|
if timeout {
|
||||||
trace!(target:"sync", "Timeout {}", peer_id);
|
trace!(target:"sync", "Timeout {}", peer_id);
|
||||||
io.disconnect_peer(*peer_id);
|
io.disconnect_peer(*peer_id);
|
||||||
aborting.push(*peer_id);
|
aborting.push(*peer_id);
|
||||||
@ -1598,6 +1626,7 @@ mod tests {
|
|||||||
asking_blocks: Vec::new(),
|
asking_blocks: Vec::new(),
|
||||||
asking_hash: None,
|
asking_hash: None,
|
||||||
ask_time: 0f64,
|
ask_time: 0f64,
|
||||||
|
expired: false,
|
||||||
});
|
});
|
||||||
sync
|
sync
|
||||||
}
|
}
|
||||||
|
@ -108,7 +108,7 @@ fn restart() {
|
|||||||
net.restart_peer(0);
|
net.restart_peer(0);
|
||||||
|
|
||||||
let status = net.peer(0).sync.read().status();
|
let status = net.peer(0).sync.read().status();
|
||||||
assert_eq!(status.state, SyncState::ChainHead);
|
assert_eq!(status.state, SyncState::Idle);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
Loading…
Reference in New Issue
Block a user