Sync to peers with confirmed fork block only (#1863)
This commit is contained in:
parent
e72fc5398a
commit
4f32a9ccc1
@ -199,6 +199,16 @@ enum PeerAsking {
|
|||||||
Heads,
|
Heads,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Eq, PartialEq)]
|
||||||
|
enum ForkConfirmation {
|
||||||
|
/// Fork block confirmation pending.
|
||||||
|
Unconfirmed,
|
||||||
|
/// Peers chain is too short to confirm the fork.
|
||||||
|
TooShort,
|
||||||
|
/// Fork is confurmed.
|
||||||
|
Confirmed,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
/// Syncing peer information
|
/// Syncing peer information
|
||||||
struct PeerInfo {
|
struct PeerInfo {
|
||||||
@ -224,13 +234,17 @@ struct PeerInfo {
|
|||||||
ask_time: f64,
|
ask_time: f64,
|
||||||
/// Pending request is expird and result should be ignored
|
/// Pending request is expird and result should be ignored
|
||||||
expired: bool,
|
expired: bool,
|
||||||
/// Peer fork confirmed
|
/// Peer fork confirmation status
|
||||||
confirmed: bool,
|
confirmation: ForkConfirmation,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PeerInfo {
|
impl PeerInfo {
|
||||||
fn is_available(&self) -> bool {
|
fn can_sync(&self) -> bool {
|
||||||
self.confirmed && !self.expired
|
self.confirmation == ForkConfirmation::Confirmed && !self.expired
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_allowed(&self) -> bool {
|
||||||
|
self.confirmation != ForkConfirmation::Unconfirmed && !self.expired
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -307,8 +321,8 @@ impl ChainSync {
|
|||||||
highest_block_number: self.highest_block.map(|n| max(n, self.last_imported_block)),
|
highest_block_number: self.highest_block.map(|n| max(n, self.last_imported_block)),
|
||||||
blocks_received: if self.last_imported_block > self.starting_block { self.last_imported_block - self.starting_block } else { 0 },
|
blocks_received: if self.last_imported_block > self.starting_block { self.last_imported_block - self.starting_block } else { 0 },
|
||||||
blocks_total: match self.highest_block { Some(x) if x > self.starting_block => x - self.starting_block, _ => 0 },
|
blocks_total: match self.highest_block { Some(x) if x > self.starting_block => x - self.starting_block, _ => 0 },
|
||||||
num_peers: self.peers.values().filter(|p| p.confirmed).count(),
|
num_peers: self.peers.values().filter(|p| p.is_allowed()).count(),
|
||||||
num_active_peers: self.peers.values().filter(|p| p.confirmed && p.asking != PeerAsking::Nothing).count(),
|
num_active_peers: self.peers.values().filter(|p| p.is_allowed() && p.asking != PeerAsking::Nothing).count(),
|
||||||
mem_used:
|
mem_used:
|
||||||
self.blocks.heap_size()
|
self.blocks.heap_size()
|
||||||
+ self.peers.heap_size_of_children()
|
+ self.peers.heap_size_of_children()
|
||||||
@ -330,7 +344,7 @@ impl ChainSync {
|
|||||||
p.asking_blocks.clear();
|
p.asking_blocks.clear();
|
||||||
p.asking_hash = None;
|
p.asking_hash = None;
|
||||||
// mark any pending requests as expired
|
// mark any pending requests as expired
|
||||||
if p.asking != PeerAsking::Nothing && p.confirmed {
|
if p.asking != PeerAsking::Nothing && p.is_allowed() {
|
||||||
p.expired = true;
|
p.expired = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -384,7 +398,7 @@ impl ChainSync {
|
|||||||
asking_hash: None,
|
asking_hash: None,
|
||||||
ask_time: 0f64,
|
ask_time: 0f64,
|
||||||
expired: false,
|
expired: false,
|
||||||
confirmed: self.fork_block.is_none(),
|
confirmation: if self.fork_block.is_none() { ForkConfirmation::Confirmed } else { ForkConfirmation::Unconfirmed },
|
||||||
};
|
};
|
||||||
|
|
||||||
trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis);
|
trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis);
|
||||||
@ -427,14 +441,19 @@ impl ChainSync {
|
|||||||
Some(ref mut peer) if peer.asking == PeerAsking::ForkHeader => {
|
Some(ref mut peer) if peer.asking == PeerAsking::ForkHeader => {
|
||||||
let item_count = r.item_count();
|
let item_count = r.item_count();
|
||||||
if item_count == 0 || (item_count == 1 && try!(r.at(0)).as_raw().sha3() == self.fork_block.unwrap().1) {
|
if item_count == 0 || (item_count == 1 && try!(r.at(0)).as_raw().sha3() == self.fork_block.unwrap().1) {
|
||||||
trace!(target: "sync", "{}: Confirmed peer", peer_id);
|
|
||||||
peer.asking = PeerAsking::Nothing;
|
peer.asking = PeerAsking::Nothing;
|
||||||
peer.confirmed = true;
|
if item_count == 0 {
|
||||||
|
trace!(target: "sync", "{}: Chain is too short to confirm the block", peer_id);
|
||||||
|
peer.confirmation = ForkConfirmation::TooShort;
|
||||||
|
} else {
|
||||||
|
trace!(target: "sync", "{}: Confirmed peer", peer_id);
|
||||||
|
peer.confirmation = ForkConfirmation::Confirmed;
|
||||||
|
}
|
||||||
true
|
true
|
||||||
} else {
|
} else {
|
||||||
trace!(target: "sync", "{}: Fork mismatch", peer_id);
|
trace!(target: "sync", "{}: Fork mismatch", peer_id);
|
||||||
io.disconnect_peer(peer_id);
|
io.disconnect_peer(peer_id);
|
||||||
false
|
return Ok(());
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
_ => false,
|
_ => false,
|
||||||
@ -586,7 +605,7 @@ impl ChainSync {
|
|||||||
/// Called by peer once it has new block bodies
|
/// Called by peer once it has new block bodies
|
||||||
#[cfg_attr(feature="dev", allow(cyclomatic_complexity))]
|
#[cfg_attr(feature="dev", allow(cyclomatic_complexity))]
|
||||||
fn on_peer_new_block(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
|
fn on_peer_new_block(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
|
||||||
if !self.peers.get(&peer_id).map_or(false, |p| p.confirmed) {
|
if !self.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
|
||||||
trace!(target: "sync", "Ignoring new block from unconfirmed peer {}", peer_id);
|
trace!(target: "sync", "Ignoring new block from unconfirmed peer {}", peer_id);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
@ -654,7 +673,7 @@ impl ChainSync {
|
|||||||
|
|
||||||
/// Handles `NewHashes` packet. Initiates headers download for any unknown hashes.
|
/// Handles `NewHashes` packet. Initiates headers download for any unknown hashes.
|
||||||
fn on_peer_new_hashes(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
|
fn on_peer_new_hashes(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
|
||||||
if !self.peers.get(&peer_id).map_or(false, |p| p.confirmed) {
|
if !self.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
|
||||||
trace!(target: "sync", "Ignoring new hashes from unconfirmed peer {}", peer_id);
|
trace!(target: "sync", "Ignoring new hashes from unconfirmed peer {}", peer_id);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
@ -741,7 +760,7 @@ impl ChainSync {
|
|||||||
/// Resume downloading
|
/// Resume downloading
|
||||||
fn continue_sync(&mut self, io: &mut SyncIo) {
|
fn continue_sync(&mut self, io: &mut SyncIo) {
|
||||||
let mut peers: Vec<(PeerId, U256)> = self.peers.iter().filter_map(|(k, p)|
|
let mut peers: Vec<(PeerId, U256)> = self.peers.iter().filter_map(|(k, p)|
|
||||||
if p.is_available() { Some((*k, p.difficulty.unwrap_or_else(U256::zero))) } else { None }).collect();
|
if p.can_sync() { Some((*k, p.difficulty.unwrap_or_else(U256::zero))) } else { None }).collect();
|
||||||
thread_rng().shuffle(&mut peers); //TODO: sort by rating
|
thread_rng().shuffle(&mut peers); //TODO: sort by rating
|
||||||
trace!(target: "sync", "Syncing with {}/{} peers", self.active_peers.len(), peers.len());
|
trace!(target: "sync", "Syncing with {}/{} peers", self.active_peers.len(), peers.len());
|
||||||
for (p, _) in peers {
|
for (p, _) in peers {
|
||||||
@ -749,7 +768,7 @@ impl ChainSync {
|
|||||||
self.sync_peer(io, p, false);
|
self.sync_peer(io, p, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if self.state != SyncState::Waiting && !self.peers.values().any(|p| p.asking != PeerAsking::Nothing && p.is_available()) {
|
if self.state != SyncState::Waiting && !self.peers.values().any(|p| p.asking != PeerAsking::Nothing && p.can_sync()) {
|
||||||
self.complete_sync();
|
self.complete_sync();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -775,7 +794,7 @@ impl ChainSync {
|
|||||||
}
|
}
|
||||||
let (peer_latest, peer_difficulty) = {
|
let (peer_latest, peer_difficulty) = {
|
||||||
let peer = self.peers.get_mut(&peer_id).unwrap();
|
let peer = self.peers.get_mut(&peer_id).unwrap();
|
||||||
if peer.asking != PeerAsking::Nothing || !peer.is_available() {
|
if peer.asking != PeerAsking::Nothing || !peer.can_sync() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if self.state == SyncState::Waiting {
|
if self.state == SyncState::Waiting {
|
||||||
@ -1037,7 +1056,7 @@ impl ChainSync {
|
|||||||
if !io.is_chain_queue_empty() {
|
if !io.is_chain_queue_empty() {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
if !self.peers.get(&peer_id).map_or(false, |p| p.confirmed) {
|
if !self.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
|
||||||
trace!(target: "sync", "{} Ignoring transactions from unconfirmed/unknown peer", peer_id);
|
trace!(target: "sync", "{} Ignoring transactions from unconfirmed/unknown peer", peer_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1693,7 +1712,7 @@ mod tests {
|
|||||||
asking_hash: None,
|
asking_hash: None,
|
||||||
ask_time: 0f64,
|
ask_time: 0f64,
|
||||||
expired: false,
|
expired: false,
|
||||||
confirmed: true,
|
confirmation: super::ForkConfirmation::Confirmed,
|
||||||
});
|
});
|
||||||
sync
|
sync
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user