Backports for beta (#1888)
* Sync to peers with confirmed fork block only (#1863) * Fixing gas conversion * Validating u256->usize conversion * Update cache usage on commiting block info (#1871) * Use UntrustedRlp for block verification (#1872) * take snapshot at specified block and slightly better informants (#1873) * prettier informant for snapshot creation * allow taking snapshot at a given block * minor tweaks * elaborate on cli * Send new block hashes to all peers (#1875) * Reduce max open files (#1876) * ws-rs update * Fixing test * Fixing miner deadlock (#1885)
This commit is contained in:
@@ -199,6 +199,16 @@ enum PeerAsking {
|
||||
Heads,
|
||||
}
|
||||
|
||||
#[derive(Clone, Eq, PartialEq)]
|
||||
enum ForkConfirmation {
|
||||
/// Fork block confirmation pending.
|
||||
Unconfirmed,
|
||||
/// Peers chain is too short to confirm the fork.
|
||||
TooShort,
|
||||
/// Fork is confurmed.
|
||||
Confirmed,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
/// Syncing peer information
|
||||
struct PeerInfo {
|
||||
@@ -224,13 +234,17 @@ struct PeerInfo {
|
||||
ask_time: f64,
|
||||
/// Pending request is expird and result should be ignored
|
||||
expired: bool,
|
||||
/// Peer fork confirmed
|
||||
confirmed: bool,
|
||||
/// Peer fork confirmation status
|
||||
confirmation: ForkConfirmation,
|
||||
}
|
||||
|
||||
impl PeerInfo {
|
||||
fn is_available(&self) -> bool {
|
||||
self.confirmed && !self.expired
|
||||
fn can_sync(&self) -> bool {
|
||||
self.confirmation == ForkConfirmation::Confirmed && !self.expired
|
||||
}
|
||||
|
||||
fn is_allowed(&self) -> bool {
|
||||
self.confirmation != ForkConfirmation::Unconfirmed && !self.expired
|
||||
}
|
||||
}
|
||||
|
||||
@@ -307,8 +321,8 @@ impl ChainSync {
|
||||
highest_block_number: self.highest_block.map(|n| max(n, self.last_imported_block)),
|
||||
blocks_received: if self.last_imported_block > self.starting_block { self.last_imported_block - self.starting_block } else { 0 },
|
||||
blocks_total: match self.highest_block { Some(x) if x > self.starting_block => x - self.starting_block, _ => 0 },
|
||||
num_peers: self.peers.values().filter(|p| p.confirmed).count(),
|
||||
num_active_peers: self.peers.values().filter(|p| p.confirmed && p.asking != PeerAsking::Nothing).count(),
|
||||
num_peers: self.peers.values().filter(|p| p.is_allowed()).count(),
|
||||
num_active_peers: self.peers.values().filter(|p| p.is_allowed() && p.asking != PeerAsking::Nothing).count(),
|
||||
mem_used:
|
||||
self.blocks.heap_size()
|
||||
+ self.peers.heap_size_of_children()
|
||||
@@ -330,7 +344,7 @@ impl ChainSync {
|
||||
p.asking_blocks.clear();
|
||||
p.asking_hash = None;
|
||||
// mark any pending requests as expired
|
||||
if p.asking != PeerAsking::Nothing && p.confirmed {
|
||||
if p.asking != PeerAsking::Nothing && p.is_allowed() {
|
||||
p.expired = true;
|
||||
}
|
||||
}
|
||||
@@ -384,7 +398,7 @@ impl ChainSync {
|
||||
asking_hash: None,
|
||||
ask_time: 0f64,
|
||||
expired: false,
|
||||
confirmed: self.fork_block.is_none(),
|
||||
confirmation: if self.fork_block.is_none() { ForkConfirmation::Confirmed } else { ForkConfirmation::Unconfirmed },
|
||||
};
|
||||
|
||||
trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis);
|
||||
@@ -427,14 +441,19 @@ impl ChainSync {
|
||||
Some(ref mut peer) if peer.asking == PeerAsking::ForkHeader => {
|
||||
let item_count = r.item_count();
|
||||
if item_count == 0 || (item_count == 1 && try!(r.at(0)).as_raw().sha3() == self.fork_block.unwrap().1) {
|
||||
trace!(target: "sync", "{}: Confirmed peer", peer_id);
|
||||
peer.asking = PeerAsking::Nothing;
|
||||
peer.confirmed = true;
|
||||
if item_count == 0 {
|
||||
trace!(target: "sync", "{}: Chain is too short to confirm the block", peer_id);
|
||||
peer.confirmation = ForkConfirmation::TooShort;
|
||||
} else {
|
||||
trace!(target: "sync", "{}: Confirmed peer", peer_id);
|
||||
peer.confirmation = ForkConfirmation::Confirmed;
|
||||
}
|
||||
true
|
||||
} else {
|
||||
trace!(target: "sync", "{}: Fork mismatch", peer_id);
|
||||
io.disconnect_peer(peer_id);
|
||||
false
|
||||
return Ok(());
|
||||
}
|
||||
},
|
||||
_ => false,
|
||||
@@ -586,7 +605,7 @@ impl ChainSync {
|
||||
/// Called by peer once it has new block bodies
|
||||
#[cfg_attr(feature="dev", allow(cyclomatic_complexity))]
|
||||
fn on_peer_new_block(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
|
||||
if !self.peers.get(&peer_id).map_or(false, |p| p.confirmed) {
|
||||
if !self.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
|
||||
trace!(target: "sync", "Ignoring new block from unconfirmed peer {}", peer_id);
|
||||
return Ok(());
|
||||
}
|
||||
@@ -654,7 +673,7 @@ impl ChainSync {
|
||||
|
||||
/// Handles `NewHashes` packet. Initiates headers download for any unknown hashes.
|
||||
fn on_peer_new_hashes(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
|
||||
if !self.peers.get(&peer_id).map_or(false, |p| p.confirmed) {
|
||||
if !self.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
|
||||
trace!(target: "sync", "Ignoring new hashes from unconfirmed peer {}", peer_id);
|
||||
return Ok(());
|
||||
}
|
||||
@@ -741,7 +760,7 @@ impl ChainSync {
|
||||
/// Resume downloading
|
||||
fn continue_sync(&mut self, io: &mut SyncIo) {
|
||||
let mut peers: Vec<(PeerId, U256)> = self.peers.iter().filter_map(|(k, p)|
|
||||
if p.is_available() { Some((*k, p.difficulty.unwrap_or_else(U256::zero))) } else { None }).collect();
|
||||
if p.can_sync() { Some((*k, p.difficulty.unwrap_or_else(U256::zero))) } else { None }).collect();
|
||||
thread_rng().shuffle(&mut peers); //TODO: sort by rating
|
||||
trace!(target: "sync", "Syncing with {}/{} peers", self.active_peers.len(), peers.len());
|
||||
for (p, _) in peers {
|
||||
@@ -749,7 +768,7 @@ impl ChainSync {
|
||||
self.sync_peer(io, p, false);
|
||||
}
|
||||
}
|
||||
if self.state != SyncState::Waiting && !self.peers.values().any(|p| p.asking != PeerAsking::Nothing && p.is_available()) {
|
||||
if self.state != SyncState::Waiting && !self.peers.values().any(|p| p.asking != PeerAsking::Nothing && p.can_sync()) {
|
||||
self.complete_sync();
|
||||
}
|
||||
}
|
||||
@@ -775,7 +794,7 @@ impl ChainSync {
|
||||
}
|
||||
let (peer_latest, peer_difficulty) = {
|
||||
let peer = self.peers.get_mut(&peer_id).unwrap();
|
||||
if peer.asking != PeerAsking::Nothing || !peer.is_available() {
|
||||
if peer.asking != PeerAsking::Nothing || !peer.can_sync() {
|
||||
return;
|
||||
}
|
||||
if self.state == SyncState::Waiting {
|
||||
@@ -1037,7 +1056,7 @@ impl ChainSync {
|
||||
if !io.is_chain_queue_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
if !self.peers.get(&peer_id).map_or(false, |p| p.confirmed) {
|
||||
if !self.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
|
||||
trace!(target: "sync", "{} Ignoring transactions from unconfirmed/unknown peer", peer_id);
|
||||
}
|
||||
|
||||
@@ -1357,27 +1376,23 @@ impl ChainSync {
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
fn select_lagging_peers(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> Vec<(PeerId, BlockNumber)> {
|
||||
fn select_random_lagging_peers(&mut self, peers: &[(PeerId, BlockNumber)]) -> Vec<(PeerId, BlockNumber)> {
|
||||
use rand::Rng;
|
||||
let mut lagging_peers = self.get_lagging_peers(chain_info, io);
|
||||
// take sqrt(x) peers
|
||||
let mut peers = peers.to_vec();
|
||||
let mut count = (self.peers.len() as f64).powf(0.5).round() as usize;
|
||||
count = min(count, MAX_PEERS_PROPAGATION);
|
||||
count = max(count, MIN_PEERS_PROPAGATION);
|
||||
::rand::thread_rng().shuffle(&mut lagging_peers);
|
||||
lagging_peers.into_iter().take(count).collect::<Vec<_>>()
|
||||
::rand::thread_rng().shuffle(&mut peers);
|
||||
peers.truncate(count);
|
||||
peers
|
||||
}
|
||||
|
||||
/// propagates latest block to lagging peers
|
||||
fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo, sealed: &[H256]) -> usize {
|
||||
let lucky_peers: Vec<_> = if sealed.is_empty() {
|
||||
self.select_lagging_peers(chain_info, io).iter().map(|&(id, _)| id).collect()
|
||||
} else {
|
||||
self.peers.keys().cloned().collect()
|
||||
};
|
||||
trace!(target: "sync", "Sending NewBlocks to {:?}", lucky_peers);
|
||||
fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo, sealed: &[H256], peers: &[(PeerId, BlockNumber)]) -> usize {
|
||||
trace!(target: "sync", "Sending NewBlocks to {:?}", peers);
|
||||
let mut sent = 0;
|
||||
for peer_id in lucky_peers {
|
||||
for &(peer_id, _) in peers {
|
||||
if sealed.is_empty() {
|
||||
let rlp = ChainSync::create_latest_block_rlp(io.chain());
|
||||
self.send_packet(io, peer_id, NEW_BLOCK_PACKET, rlp);
|
||||
@@ -1395,12 +1410,11 @@ impl ChainSync {
|
||||
}
|
||||
|
||||
/// propagates new known hashes to all peers
|
||||
fn propagate_new_hashes(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> usize {
|
||||
let lucky_peers = self.select_lagging_peers(chain_info, io);
|
||||
trace!(target: "sync", "Sending NewHashes to {:?}", lucky_peers);
|
||||
fn propagate_new_hashes(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo, peers: &[(PeerId, BlockNumber)]) -> usize {
|
||||
trace!(target: "sync", "Sending NewHashes to {:?}", peers);
|
||||
let mut sent = 0;
|
||||
let last_parent = HeaderView::new(&io.chain().block_header(BlockID::Hash(chain_info.best_block_hash.clone())).unwrap()).parent_hash();
|
||||
for (peer_id, peer_number) in lucky_peers {
|
||||
for &(peer_id, peer_number) in peers {
|
||||
let peer_best = if chain_info.best_block_number - peer_number > MAX_PEER_LAG_PROPAGATION as BlockNumber {
|
||||
// If we think peer is too far behind just send one latest hash
|
||||
last_parent.clone()
|
||||
@@ -1466,11 +1480,19 @@ impl ChainSync {
|
||||
fn propagate_latest_blocks(&mut self, io: &mut SyncIo, sealed: &[H256]) {
|
||||
let chain_info = io.chain().chain_info();
|
||||
if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION {
|
||||
let hashes = self.propagate_new_hashes(&chain_info, io);
|
||||
let blocks = self.propagate_blocks(&chain_info, io, sealed);
|
||||
if blocks != 0 || hashes != 0 {
|
||||
trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes);
|
||||
}
|
||||
let mut peers = self.get_lagging_peers(&chain_info, io);
|
||||
if sealed.is_empty() {
|
||||
let hashes = self.propagate_new_hashes(&chain_info, io, &peers);
|
||||
peers = self.select_random_lagging_peers(&peers);
|
||||
let blocks = self.propagate_blocks(&chain_info, io, sealed, &peers);
|
||||
if blocks != 0 || hashes != 0 {
|
||||
trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes);
|
||||
}
|
||||
} else {
|
||||
self.propagate_blocks(&chain_info, io, sealed, &peers);
|
||||
self.propagate_new_hashes(&chain_info, io, &peers);
|
||||
trace!(target: "sync", "Sent sealed block to all peers");
|
||||
};
|
||||
}
|
||||
self.propagate_new_transactions(io);
|
||||
self.last_sent_block_number = chain_info.best_block_number;
|
||||
@@ -1693,7 +1715,7 @@ mod tests {
|
||||
asking_hash: None,
|
||||
ask_time: 0f64,
|
||||
expired: false,
|
||||
confirmed: true,
|
||||
confirmation: super::ForkConfirmation::Confirmed,
|
||||
});
|
||||
sync
|
||||
}
|
||||
@@ -1738,7 +1760,8 @@ mod tests {
|
||||
let chain_info = client.chain_info();
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
|
||||
let peer_count = sync.propagate_new_hashes(&chain_info, &mut io);
|
||||
let peers = sync.get_lagging_peers(&chain_info, &mut io);
|
||||
let peer_count = sync.propagate_new_hashes(&chain_info, &mut io, &peers);
|
||||
|
||||
// 1 message should be send
|
||||
assert_eq!(1, io.queue.len());
|
||||
@@ -1756,7 +1779,8 @@ mod tests {
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
|
||||
let chain_info = client.chain_info();
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let peer_count = sync.propagate_blocks(&chain_info, &mut io, &[]);
|
||||
let peers = sync.get_lagging_peers(&chain_info, &mut io);
|
||||
let peer_count = sync.propagate_blocks(&chain_info, &mut io, &[], &peers);
|
||||
|
||||
// 1 message should be send
|
||||
assert_eq!(1, io.queue.len());
|
||||
@@ -1775,7 +1799,8 @@ mod tests {
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
|
||||
let chain_info = client.chain_info();
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let peer_count = sync.propagate_blocks(&chain_info, &mut io, &[hash.clone()]);
|
||||
let peers = sync.get_lagging_peers(&chain_info, &mut io);
|
||||
let peer_count = sync.propagate_blocks(&chain_info, &mut io, &[hash.clone()], &peers);
|
||||
|
||||
// 1 message should be send
|
||||
assert_eq!(1, io.queue.len());
|
||||
@@ -1881,7 +1906,8 @@ mod tests {
|
||||
let chain_info = client.chain_info();
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
|
||||
sync.propagate_new_hashes(&chain_info, &mut io);
|
||||
let peers = sync.get_lagging_peers(&chain_info, &mut io);
|
||||
sync.propagate_new_hashes(&chain_info, &mut io, &peers);
|
||||
|
||||
let data = &io.queue[0].data.clone();
|
||||
let result = sync.on_peer_new_hashes(&mut io, 0, &UntrustedRlp::new(data));
|
||||
@@ -1899,7 +1925,8 @@ mod tests {
|
||||
let chain_info = client.chain_info();
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
|
||||
sync.propagate_blocks(&chain_info, &mut io, &[]);
|
||||
let peers = sync.get_lagging_peers(&chain_info, &mut io);
|
||||
sync.propagate_blocks(&chain_info, &mut io, &[], &peers);
|
||||
|
||||
let data = &io.queue[0].data.clone();
|
||||
let result = sync.on_peer_new_block(&mut io, 0, &UntrustedRlp::new(data));
|
||||
|
||||
@@ -161,11 +161,11 @@ fn propagate_hashes() {
|
||||
net.trigger_chain_new_blocks(0); //first event just sets the marker
|
||||
net.trigger_chain_new_blocks(0);
|
||||
|
||||
// 5 peers to sync
|
||||
assert_eq!(5, net.peer(0).queue.len());
|
||||
// 5 peers with NewHahses, 4 with blocks
|
||||
assert_eq!(9, net.peer(0).queue.len());
|
||||
let mut hashes = 0;
|
||||
let mut blocks = 0;
|
||||
for i in 0..5 {
|
||||
for i in 0..net.peer(0).queue.len() {
|
||||
if net.peer(0).queue[i].packet_id == 0x1 {
|
||||
hashes += 1;
|
||||
}
|
||||
@@ -173,7 +173,8 @@ fn propagate_hashes() {
|
||||
blocks += 1;
|
||||
}
|
||||
}
|
||||
assert!(blocks + hashes == 5);
|
||||
assert_eq!(blocks, 4);
|
||||
assert_eq!(hashes, 5);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
Reference in New Issue
Block a user