Merge with master

This commit is contained in:
Robert Habermeier
2016-06-29 14:46:29 +02:00
parent 3850ee64bb
commit 49024a4f28
235 changed files with 36456 additions and 4814 deletions

View File

@@ -95,12 +95,12 @@ use ethcore::views::{HeaderView, BlockView};
use ethcore::header::{BlockNumber, Header as BlockHeader};
use ethcore::client::{BlockChainClient, BlockStatus, BlockID, BlockChainInfo};
use ethcore::error::*;
use ethcore::transaction::SignedTransaction;
use ethcore::block::Block;
use io::SyncIo;
use time;
use super::SyncConfig;
use blocks::BlockCollection;
use rand::{thread_rng, Rng};
known_heap_size!(0, PeerInfo);
@@ -118,6 +118,9 @@ const MIN_PEERS_PROPAGATION: usize = 4;
const MAX_PEERS_PROPAGATION: usize = 128;
const MAX_PEER_LAG_PROPAGATION: BlockNumber = 20;
const SUBCHAIN_SIZE: usize = 64;
const MAX_ROUND_PARENTS: usize = 32;
const MAX_NEW_HASHES: usize = 64;
const MAX_TX_TO_IMPORT: usize = 512;
const STATUS_PACKET: u8 = 0x00;
const NEW_BLOCK_HASHES_PACKET: u8 = 0x01;
@@ -238,6 +241,8 @@ pub struct ChainSync {
_max_download_ahead_blocks: usize,
/// Number of blocks imported this round
imported_this_round: Option<usize>,
/// Block parents imported this round (hash, parent)
round_parents: VecDeque<(H256, H256)>,
/// Network ID
network_id: U256,
}
@@ -260,6 +265,7 @@ impl ChainSync {
syncing_difficulty: U256::from(0u64),
last_sent_block_number: 0,
imported_this_round: None,
round_parents: VecDeque::new(),
_max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks),
network_id: config.network_id,
};
@@ -281,11 +287,9 @@ impl ChainSync {
num_peers: self.peers.len(),
num_active_peers: self.peers.values().filter(|p| p.asking != PeerAsking::Nothing).count(),
mem_used:
// TODO: https://github.com/servo/heapsize/pull/50
//+ self.downloading_bodies.heap_size_of_children()
//+ self.downloading_headers.heap_size_of_children()
self.blocks.heap_size()
+ self.peers.heap_size_of_children(),
+ self.peers.heap_size_of_children()
+ self.round_parents.heap_size_of_children(),
}
}
@@ -305,7 +309,6 @@ impl ChainSync {
}
self.syncing_difficulty = From::from(0u64);
self.state = SyncState::Idle;
self.blocks.clear();
self.active_peers = self.peers.keys().cloned().collect();
}
@@ -319,6 +322,7 @@ impl ChainSync {
/// Remove peer from active peer set
fn deactivate_peer(&mut self, io: &mut SyncIo, peer_id: PeerId) {
trace!(target: "sync", "Deactivating peer {}", peer_id);
self.active_peers.remove(&peer_id);
if self.active_peers.is_empty() {
trace!(target: "sync", "No more active peers");
@@ -355,9 +359,13 @@ impl ChainSync {
};
trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis);
if io.is_expired() {
trace!(target: "sync", "Status packet from expired session {}:{}", peer_id, io.peer_info(peer_id));
return Ok(());
}
if self.peers.contains_key(&peer_id) {
warn!("Unexpected status packet from {}:{}", peer_id, io.peer_info(peer_id));
debug!(target: "sync", "Unexpected status packet from {}:{}", peer_id, io.peer_info(peer_id));
return Ok(());
}
let chain_info = io.chain().chain_info();
@@ -383,14 +391,15 @@ impl ChainSync {
/// Called by peer once it has new block headers during sync
fn on_peer_block_headers(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
self.clear_peer_download(peer_id);
let expected_hash = self.peers.get(&peer_id).and_then(|p| p.asking_hash);
let expected_asking = if self.state == SyncState::ChainHead { PeerAsking::Heads } else { PeerAsking::BlockHeaders };
if !self.reset_peer_asking(peer_id, expected_asking) {
if !self.reset_peer_asking(peer_id, expected_asking) || expected_hash.is_none() {
trace!(target: "sync", "Ignored unexpected headers");
self.continue_sync(io);
return Ok(());
}
let item_count = r.item_count();
trace!(target: "sync", "{} -> BlockHeaders ({} entries)", peer_id, item_count);
trace!(target: "sync", "{} -> BlockHeaders ({} entries), state = {:?}", peer_id, item_count, self.state);
if self.state == SyncState::Idle {
trace!(target: "sync", "Ignored unexpected block headers");
self.continue_sync(io);
@@ -403,14 +412,22 @@ impl ChainSync {
}
if item_count == 0 && (self.state == SyncState::Blocks || self.state == SyncState::NewBlocks) {
self.deactivate_peer(io, peer_id); //TODO: is this too harsh?
self.continue_sync(io);
return Ok(());
}
let mut headers = Vec::new();
let mut hashes = Vec::new();
let mut valid_response = item_count == 0; //empty response is valid
for i in 0..item_count {
let info: BlockHeader = try!(r.val_at(i));
let number = BlockNumber::from(info.number);
// Check if any of the headers matches the hash we requested
if !valid_response {
if let Some(expected) = expected_hash {
valid_response = expected == info.hash()
}
}
if self.blocks.contains(&info.hash()) {
trace!(target: "sync", "Skipping existing block header {} ({:?})", number, info.hash());
continue;
@@ -441,6 +458,17 @@ impl ChainSync {
}
}
// Disable the peer for this syncing round if it gives invalid chain
if !valid_response {
trace!(target: "sync", "{} Deactivated for invalid headers response", peer_id);
self.deactivate_peer(io, peer_id);
}
if headers.is_empty() {
// Peer does not have any new subchain heads, deactivate it nd try with another
trace!(target: "sync", "{} Deactivated for no data", peer_id);
self.deactivate_peer(io, peer_id);
}
match self.state {
SyncState::ChainHead => {
if headers.is_empty() {
@@ -488,7 +516,10 @@ impl ChainSync {
for i in 0..item_count {
bodies.push(try!(r.at(i)).as_raw().to_vec());
}
self.blocks.insert_bodies(bodies);
if self.blocks.insert_bodies(bodies) != item_count {
trace!(target: "sync", "Deactivating peer for giving invalid block bodies");
self.deactivate_peer(io, peer_id);
}
self.collect_blocks(io);
}
self.continue_sync(io);
@@ -502,57 +533,55 @@ impl ChainSync {
let header_rlp = try!(block_rlp.at(0));
let h = header_rlp.as_raw().sha3();
trace!(target: "sync", "{} -> NewBlock ({})", peer_id, h);
if self.state != SyncState::Idle {
trace!(target: "sync", "NewBlock ignored while seeking");
return Ok(());
}
let header: BlockHeader = try!(header_rlp.as_val());
if header.number() > self.highest_block.unwrap_or(0) {
self.highest_block = Some(header.number());
}
let mut unknown = false;
{
let peer = self.peers.get_mut(&peer_id).unwrap();
peer.latest_hash = header.hash();
peer.latest_number = Some(header.number());
}
if header.number <= self.last_imported_block + 1 {
match io.chain().import_block(block_rlp.as_raw().to_vec()) {
Err(Error::Import(ImportError::AlreadyInChain)) => {
trace!(target: "sync", "New block already in chain {:?}", h);
},
Err(Error::Import(ImportError::AlreadyQueued)) => {
trace!(target: "sync", "New block already queued {:?}", h);
},
Ok(_) => {
if header.number == self.last_imported_block + 1 {
self.last_imported_block = header.number;
self.last_imported_hash = header.hash();
}
trace!(target: "sync", "New block queued {:?} ({})", h, header.number);
},
Err(Error::Block(BlockError::UnknownParent(p))) => {
unknown = true;
trace!(target: "sync", "New block with unknown parent ({:?}) {:?}", p, h);
},
Err(e) => {
debug!(target: "sync", "Bad new block {:?} : {:?}", h, e);
io.disable_peer(peer_id);
}
};
}
else {
unknown = true;
}
if unknown {
trace!(target: "sync", "New unknown block {:?}", h);
//TODO: handle too many unknown blocks
let difficulty: U256 = try!(r.val_at(1));
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
if peer.difficulty.map_or(true, |pd| difficulty > pd) {
//self.state = SyncState::ChainHead;
peer.difficulty = Some(difficulty);
trace!(target: "sync", "Received block {:?} with no known parent. Peer needs syncing...", h);
match io.chain().import_block(block_rlp.as_raw().to_vec()) {
Err(Error::Import(ImportError::AlreadyInChain)) => {
trace!(target: "sync", "New block already in chain {:?}", h);
},
Err(Error::Import(ImportError::AlreadyQueued)) => {
trace!(target: "sync", "New block already queued {:?}", h);
},
Ok(_) => {
if header.number == self.last_imported_block + 1 {
self.last_imported_block = header.number;
self.last_imported_hash = header.hash();
}
trace!(target: "sync", "New block queued {:?} ({})", h, header.number);
},
Err(Error::Block(BlockError::UnknownParent(p))) => {
unknown = true;
trace!(target: "sync", "New block with unknown parent ({:?}) {:?}", p, h);
},
Err(e) => {
debug!(target: "sync", "Bad new block {:?} : {:?}", h, e);
io.disable_peer(peer_id);
}
};
if unknown {
if self.state != SyncState::Idle {
trace!(target: "sync", "NewBlock ignored while seeking");
} else {
trace!(target: "sync", "New unknown block {:?}", h);
//TODO: handle too many unknown blocks
let difficulty: U256 = try!(r.val_at(1));
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
if peer.difficulty.map_or(true, |pd| difficulty > pd) {
//self.state = SyncState::ChainHead;
peer.difficulty = Some(difficulty);
trace!(target: "sync", "Received block {:?} with no known parent. Peer needs syncing...", h);
}
}
self.sync_peer(io, peer_id, true);
}
self.sync_peer(io, peer_id, true);
}
Ok(())
}
@@ -561,15 +590,22 @@ impl ChainSync {
fn on_peer_new_hashes(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
if self.state != SyncState::Idle {
trace!(target: "sync", "Ignoring new hashes since we're already downloading.");
let max = r.iter().take(MAX_NEW_HASHES).map(|item| item.val_at::<BlockNumber>(1).unwrap_or(0)).fold(0u64, max);
if max > self.highest_block.unwrap_or(0) {
self.highest_block = Some(max);
}
return Ok(());
}
trace!(target: "sync", "{} -> NewHashes ({} entries)", peer_id, r.item_count());
let hashes = r.iter().map(|item| (item.val_at::<H256>(0), item.val_at::<BlockNumber>(1)));
let hashes = r.iter().take(MAX_NEW_HASHES).map(|item| (item.val_at::<H256>(0), item.val_at::<BlockNumber>(1)));
let mut max_height: BlockNumber = 0;
let mut new_hashes = Vec::new();
for (rh, rd) in hashes {
let h = try!(rh);
let d = try!(rd);
if d > self.highest_block.unwrap_or(0) {
self.highest_block = Some(d);
}
if self.blocks.is_downloading(&h) {
continue;
}
@@ -630,7 +666,8 @@ impl ChainSync {
/// Resume downloading
fn continue_sync(&mut self, io: &mut SyncIo) {
let mut peers: Vec<(PeerId, U256)> = self.peers.iter().map(|(k, p)| (*k, p.difficulty.unwrap_or_else(U256::zero))).collect();
peers.sort_by(|&(_, d1), &(_, d2)| d1.cmp(&d2).reverse()); //TODO: sort by rating
thread_rng().shuffle(&mut peers); //TODO: sort by rating
trace!(target: "sync", "Syncing with {}/{} peers", self.active_peers.len(), peers.len());
for (p, _) in peers {
if self.active_peers.contains(&p) {
self.sync_peer(io, p, false);
@@ -655,7 +692,11 @@ impl ChainSync {
}
/// Find something to do for a peer. Called for a new peer or when a peer is done with it's task.
fn sync_peer(&mut self, io: &mut SyncIo, peer_id: PeerId, force: bool) {
fn sync_peer(&mut self, io: &mut SyncIo, peer_id: PeerId, force: bool) {
if !self.active_peers.contains(&peer_id) {
trace!(target: "sync", "Skipping deactivated peer");
return;
}
let (peer_latest, peer_difficulty) = {
let peer = self.peers.get_mut(&peer_id).unwrap();
if peer.asking != PeerAsking::Nothing {
@@ -703,18 +744,28 @@ impl ChainSync {
trace!(target: "sync", "Starting round (last imported count = {:?}, block = {:?}", self.imported_this_round, self.last_imported_block);
// Check if need to retract to find the common block. The problem is that the peers still return headers by hash even
// from the non-canonical part of the tree. So we also retract if nothing has been imported last round.
if self.imported_this_round.is_some() && self.imported_this_round.unwrap() == 0 && self.last_imported_block > 0 {
match io.chain().block_hash(BlockID::Number(self.last_imported_block - 1)) {
Some(h) => {
match self.imported_this_round {
Some(n) if n == 0 && self.last_imported_block > 0 => {
// nothing was imported last round, step back to a previous block
// search parent in last round known parents first
if let Some(&(_, p)) = self.round_parents.iter().find(|&&(h, _)| h == self.last_imported_hash) {
self.last_imported_block -= 1;
self.last_imported_hash = h;
trace!(target: "sync", "Searching common header {} ({})", self.last_imported_block, self.last_imported_hash);
self.last_imported_hash = p.clone();
trace!(target: "sync", "Searching common header from the last round {} ({})", self.last_imported_block, self.last_imported_hash);
} else {
match io.chain().block_hash(BlockID::Number(self.last_imported_block - 1)) {
Some(h) => {
self.last_imported_block -= 1;
self.last_imported_hash = h;
trace!(target: "sync", "Searching common header in the blockchain {} ({})", self.last_imported_block, self.last_imported_hash);
}
None => {
debug!(target: "sync", "Could not revert to previous block, last: {} ({})", self.last_imported_block, self.last_imported_hash);
}
}
}
None => {
// TODO: get hash by number from the block queue
trace!(target: "sync", "Could not revert to previous block, last: {} ({})", self.last_imported_block, self.last_imported_hash);
}
}
},
_ => (),
}
self.imported_this_round = None;
}
@@ -761,6 +812,15 @@ impl ChainSync {
peer.asking_blocks.clear();
}
fn block_imported(&mut self, hash: &H256, number: BlockNumber, parent: &H256) {
self.last_imported_block = number;
self.last_imported_hash = hash.clone();
self.round_parents.push_back((hash.clone(), parent.clone()));
if self.round_parents.len() > MAX_ROUND_PARENTS {
self.round_parents.pop_front();
}
}
/// Checks if there are blocks fully downloaded that can be imported into the blockchain and does the import.
fn collect_blocks(&mut self, io: &mut SyncIo) {
let mut restart = false;
@@ -768,8 +828,10 @@ impl ChainSync {
let blocks = self.blocks.drain();
let count = blocks.len();
for block in blocks {
let number = BlockView::new(&block).header_view().number();
let h = BlockView::new(&block).header_view().sha3();
let (h, number, parent) = {
let header = BlockView::new(&block).header_view();
(header.sha3(), header.number(), header.parent_hash())
};
// Perform basic block verification
if !Block::is_good(&block) {
@@ -780,20 +842,17 @@ impl ChainSync {
match io.chain().import_block(block) {
Err(Error::Import(ImportError::AlreadyInChain)) => {
self.last_imported_block = number;
self.last_imported_hash = h.clone();
trace!(target: "sync", "Block already in chain {:?}", h);
self.block_imported(&h, number, &parent);
},
Err(Error::Import(ImportError::AlreadyQueued)) => {
self.last_imported_block = number;
self.last_imported_hash = h.clone();
trace!(target: "sync", "Block already queued {:?}", h);
self.block_imported(&h, number, &parent);
},
Ok(_) => {
trace!(target: "sync", "Block queued {:?}", h);
self.last_imported_block = number;
self.last_imported_hash = h.clone();
imported.insert(h.clone());
self.block_imported(&h, number, &parent);
},
Err(Error::Block(BlockError::UnknownParent(_))) if self.state == SyncState::NewBlocks => {
trace!(target: "sync", "Unknown new block parent, restarting sync");
@@ -831,6 +890,9 @@ impl ChainSync {
rlp.append(&skip);
rlp.append(&if reverse {1u32} else {0u32});
self.send_request(sync, peer_id, asking, GET_BLOCK_HEADERS_PACKET, rlp.out());
self.peers.get_mut(&peer_id)
.expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed")
.asking_hash = Some(h.clone());
}
/// Request block bodies from a peer
@@ -886,15 +948,15 @@ impl ChainSync {
return Ok(());
}
let item_count = r.item_count();
let mut item_count = r.item_count();
trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count);
item_count = min(item_count, MAX_TX_TO_IMPORT);
let mut transactions = Vec::with_capacity(item_count);
for i in 0..item_count {
let tx: SignedTransaction = try!(r.val_at(i));
for i in 0 .. item_count {
let tx = try!(r.at(i)).as_raw().to_vec();
transactions.push(tx);
}
let _ = io.chain().import_transactions(transactions);
io.chain().queue_transactions(transactions);
Ok(())
}
@@ -911,7 +973,7 @@ impl ChainSync {
}
/// Respond to GetBlockHeaders request
fn return_block_headers(io: &SyncIo, r: &UntrustedRlp) -> RlpResponseResult {
fn return_block_headers(io: &SyncIo, r: &UntrustedRlp, peer_id: PeerId) -> RlpResponseResult {
// Packet layout:
// [ block: { P , B_32 }, maxHeaders: P, skip: P, reverse: P in { 0 , 1 } ]
let max_headers: usize = try!(r.val_at(1));
@@ -921,13 +983,25 @@ impl ChainSync {
let number = if try!(r.at(0)).size() == 32 {
// id is a hash
let hash: H256 = try!(r.val_at(0));
trace!(target: "sync", "-> GetBlockHeaders (hash: {}, max: {}, skip: {}, reverse:{})", hash, max_headers, skip, reverse);
trace!(target: "sync", "{} -> GetBlockHeaders (hash: {}, max: {}, skip: {}, reverse:{})", peer_id, hash, max_headers, skip, reverse);
match io.chain().block_header(BlockID::Hash(hash)) {
Some(hdr) => From::from(HeaderView::new(&hdr).number()),
Some(hdr) => {
let number = From::from(HeaderView::new(&hdr).number());
debug_assert_eq!(HeaderView::new(&hdr).sha3(), hash);
if max_headers == 1 || io.chain().block_hash(BlockID::Number(number)) != Some(hash) {
// Non canonical header or single header requested
// TODO: handle single-step reverse hashchains of non-canon hashes
trace!(target:"sync", "Returning single header: {:?}", hash);
let mut rlp = RlpStream::new_list(1);
rlp.append_raw(&hdr, 1);
return Ok(Some((BLOCK_HEADERS_PACKET, rlp)));
}
number
}
None => return Ok(Some((BLOCK_HEADERS_PACKET, RlpStream::new_list(0)))) //no such header, return nothing
}
} else {
trace!(target: "sync", "-> GetBlockHeaders (number: {}, max: {}, skip: {}, reverse:{})", try!(r.val_at::<BlockNumber>(0)), max_headers, skip, reverse);
trace!(target: "sync", "{} -> GetBlockHeaders (number: {}, max: {}, skip: {}, reverse:{})", peer_id, try!(r.val_at::<BlockNumber>(0)), max_headers, skip, reverse);
try!(r.val_at(0))
};
@@ -962,13 +1036,13 @@ impl ChainSync {
}
/// Respond to GetBlockBodies request
fn return_block_bodies(io: &SyncIo, r: &UntrustedRlp) -> RlpResponseResult {
fn return_block_bodies(io: &SyncIo, r: &UntrustedRlp, peer_id: PeerId) -> RlpResponseResult {
let mut count = r.item_count();
if count == 0 {
debug!(target: "sync", "Empty GetBlockBodies request, ignoring.");
return Ok(None);
}
trace!(target: "sync", "-> GetBlockBodies: {} entries", count);
trace!(target: "sync", "{} -> GetBlockBodies: {} entries", peer_id, count);
count = min(count, MAX_BODIES_TO_SEND);
let mut added = 0usize;
let mut data = Bytes::new();
@@ -985,8 +1059,9 @@ impl ChainSync {
}
/// Respond to GetNodeData request
fn return_node_data(io: &SyncIo, r: &UntrustedRlp) -> RlpResponseResult {
fn return_node_data(io: &SyncIo, r: &UntrustedRlp, peer_id: PeerId) -> RlpResponseResult {
let mut count = r.item_count();
trace!(target: "sync", "{} -> GetNodeData: {} entries", peer_id, count);
if count == 0 {
debug!(target: "sync", "Empty GetNodeData request, ignoring.");
return Ok(None);
@@ -1000,13 +1075,15 @@ impl ChainSync {
added += 1;
}
}
trace!(target: "sync", "{} -> GetNodeData: return {} entries", peer_id, added);
let mut rlp = RlpStream::new_list(added);
rlp.append_raw(&data, added);
Ok(Some((NODE_DATA_PACKET, rlp)))
}
fn return_receipts(io: &SyncIo, rlp: &UntrustedRlp) -> RlpResponseResult {
fn return_receipts(io: &SyncIo, rlp: &UntrustedRlp, peer_id: PeerId) -> RlpResponseResult {
let mut count = rlp.item_count();
trace!(target: "sync", "{} -> GetReceipts: {} entries", peer_id, count);
if count == 0 {
debug!(target: "sync", "Empty GetReceipts request, ignoring.");
return Ok(None);
@@ -1028,11 +1105,11 @@ impl ChainSync {
Ok(Some((RECEIPTS_PACKET, rlp_result)))
}
fn return_rlp<FRlp, FError>(&self, io: &mut SyncIo, rlp: &UntrustedRlp, rlp_func: FRlp, error_func: FError) -> Result<(), PacketDecodeError>
where FRlp : Fn(&SyncIo, &UntrustedRlp) -> RlpResponseResult,
fn return_rlp<FRlp, FError>(io: &mut SyncIo, rlp: &UntrustedRlp, peer: PeerId, rlp_func: FRlp, error_func: FError) -> Result<(), PacketDecodeError>
where FRlp : Fn(&SyncIo, &UntrustedRlp, PeerId) -> RlpResponseResult,
FError : FnOnce(UtilError) -> String
{
let response = rlp_func(io, rlp);
let response = rlp_func(io, rlp, peer);
match response {
Err(e) => Err(e),
Ok(Some((packet_id, rlp_stream))) => {
@@ -1045,13 +1122,41 @@ impl ChainSync {
}
/// Dispatch incoming requests and responses
pub fn on_packet(&mut self, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
pub fn dispatch_packet(sync: &RwLock<ChainSync>, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
let rlp = UntrustedRlp::new(data);
let result = match packet_id {
GET_BLOCK_BODIES_PACKET => ChainSync::return_rlp(io, &rlp, peer,
ChainSync::return_block_bodies,
|e| format!("Error sending block bodies: {:?}", e)),
GET_BLOCK_HEADERS_PACKET => ChainSync::return_rlp(io, &rlp, peer,
ChainSync::return_block_headers,
|e| format!("Error sending block headers: {:?}", e)),
GET_RECEIPTS_PACKET => ChainSync::return_rlp(io, &rlp, peer,
ChainSync::return_receipts,
|e| format!("Error sending receipts: {:?}", e)),
GET_NODE_DATA_PACKET => ChainSync::return_rlp(io, &rlp, peer,
ChainSync::return_node_data,
|e| format!("Error sending nodes: {:?}", e)),
_ => {
sync.write().unwrap().on_packet(io, peer, packet_id, data);
Ok(())
}
};
result.unwrap_or_else(|e| {
debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e);
})
}
pub fn on_packet(&mut self, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
if packet_id != STATUS_PACKET && !self.peers.contains_key(&peer) {
debug!(target:"sync", "Unexpected packet from unregistered peer: {}:{}", peer, io.peer_info(peer));
return;
}
let rlp = UntrustedRlp::new(data);
let result = match packet_id {
STATUS_PACKET => self.on_peer_status(io, peer, &rlp),
TRANSACTIONS_PACKET => self.on_peer_transactions(io, peer, &rlp),
@@ -1059,23 +1164,6 @@ impl ChainSync {
BLOCK_BODIES_PACKET => self.on_peer_block_bodies(io, peer, &rlp),
NEW_BLOCK_PACKET => self.on_peer_new_block(io, peer, &rlp),
NEW_BLOCK_HASHES_PACKET => self.on_peer_new_hashes(io, peer, &rlp),
GET_BLOCK_BODIES_PACKET => self.return_rlp(io, &rlp,
ChainSync::return_block_bodies,
|e| format!("Error sending block bodies: {:?}", e)),
GET_BLOCK_HEADERS_PACKET => self.return_rlp(io, &rlp,
ChainSync::return_block_headers,
|e| format!("Error sending block headers: {:?}", e)),
GET_RECEIPTS_PACKET => self.return_rlp(io, &rlp,
ChainSync::return_receipts,
|e| format!("Error sending receipts: {:?}", e)),
GET_NODE_DATA_PACKET => self.return_rlp(io, &rlp,
ChainSync::return_node_data,
|e| format!("Error sending nodes: {:?}", e)),
_ => {
debug!(target: "sync", "Unknown packet {}", packet_id);
Ok(())
@@ -1086,14 +1174,19 @@ impl ChainSync {
})
}
pub fn maintain_peers(&self, io: &mut SyncIo) {
pub fn maintain_peers(&mut self, io: &mut SyncIo) {
let tick = time::precise_time_s();
let mut aborting = Vec::new();
for (peer_id, peer) in &self.peers {
if peer.asking != PeerAsking::Nothing && (tick - peer.ask_time) > CONNECTION_TIMEOUT_SEC {
trace!(target:"sync", "Timeouted {}", peer_id);
trace!(target:"sync", "Timeout {}", peer_id);
io.disconnect_peer(*peer_id);
aborting.push(*peer_id);
}
}
for p in aborting {
self.on_peer_aborting(io, p);
}
}
fn check_resume(&mut self, io: &mut SyncIo) {
@@ -1116,10 +1209,11 @@ impl ChainSync {
let mut rlp_stream = RlpStream::new_list(blocks.len());
for block_hash in blocks {
let mut hash_rlp = RlpStream::new_list(2);
let difficulty = chain.block_total_difficulty(BlockID::Hash(block_hash.clone())).expect("Malformed block without a difficulty on the chain!");
let number = HeaderView::new(&chain.block_header(BlockID::Hash(block_hash.clone()))
.expect("chain.tree_route and chain.find_uncles only return hahses of blocks that are in the blockchain. qed.")).number();
hash_rlp.append(&block_hash);
hash_rlp.append(&difficulty);
rlp_stream.append_raw(&hash_rlp.out(), 1);
hash_rlp.append(&number);
rlp_stream.append_raw(hash_rlp.as_raw(), 1);
}
Some(rlp_stream.out())
}
@@ -1171,6 +1265,7 @@ impl ChainSync {
/// propagates latest block to lagging peers
fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> usize {
let lucky_peers = self.select_lagging_peers(chain_info, io);
trace!(target: "sync", "Sending NewBlocks to {:?}", lucky_peers);
let mut sent = 0;
for (peer_id, _) in lucky_peers {
let rlp = ChainSync::create_latest_block_rlp(io.chain());
@@ -1185,6 +1280,7 @@ impl ChainSync {
/// propagates new known hashes to all peers
fn propagate_new_hashes(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> usize {
let lucky_peers = self.select_lagging_peers(chain_info, io);
trace!(target: "sync", "Sending NewHashes to {:?}", lucky_peers);
let mut sent = 0;
let last_parent = HeaderView::new(&io.chain().block_header(BlockID::Hash(chain_info.best_block_hash.clone())).unwrap()).parent_hash();
for (peer_id, peer_number) in lucky_peers {
@@ -1218,7 +1314,7 @@ impl ChainSync {
return 0;
}
let mut transactions = io.chain().all_transactions();
let mut transactions = io.chain().pending_transactions();
if transactions.is_empty() {
return 0;
}
@@ -1268,7 +1364,7 @@ impl ChainSync {
self.check_resume(io);
}
/// called when block is imported to chain, updates transactions queue and propagates the blocks
/// called when block is imported to chain, updates transactions queue and propagates the blocks
pub fn chain_new_blocks(&mut self, io: &mut SyncIo, _imported: &[H256], invalid: &[H256], _enacted: &[H256], _retracted: &[H256]) {
if io.is_chain_queue_empty() {
// Propagate latests blocks
@@ -1338,7 +1434,7 @@ mod tests {
let mut queue = VecDeque::new();
let io = TestIo::new(&mut client, &mut queue, None);
let result = ChainSync::return_receipts(&io, &UntrustedRlp::new(&[0xc0]));
let result = ChainSync::return_receipts(&io, &UntrustedRlp::new(&[0xc0]), 0);
assert!(result.is_ok());
}
@@ -1347,7 +1443,7 @@ mod tests {
fn return_receipts() {
let mut client = TestBlockChainClient::new();
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(H256::new(), &client);
let sync = dummy_sync_with_peer(H256::new(), &client);
let mut io = TestIo::new(&mut client, &mut queue, None);
let mut receipt_list = RlpStream::new_list(4);
@@ -1358,7 +1454,7 @@ mod tests {
let receipts_request = receipt_list.out();
// it returns rlp ONLY for hashes started with "f"
let result = ChainSync::return_receipts(&io, &UntrustedRlp::new(&receipts_request.clone()));
let result = ChainSync::return_receipts(&io, &UntrustedRlp::new(&receipts_request.clone()), 0);
assert!(result.is_ok());
let rlp_result = result.unwrap();
@@ -1368,7 +1464,7 @@ mod tests {
assert_eq!(603, rlp_result.unwrap().1.out().len());
io.sender = Some(2usize);
sync.on_packet(&mut io, 0usize, super::GET_RECEIPTS_PACKET, &receipts_request);
ChainSync::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, super::GET_RECEIPTS_PACKET, &receipts_request);
assert_eq!(1, io.queue.len());
}
@@ -1406,33 +1502,33 @@ mod tests {
let io = TestIo::new(&mut client, &mut queue, None);
let unknown: H256 = H256::new();
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&unknown, 1, 0, false)));
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&unknown, 1, 0, false)), 0);
assert!(to_header_vec(result).is_empty());
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&unknown, 1, 0, true)));
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&unknown, 1, 0, true)), 0);
assert!(to_header_vec(result).is_empty());
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&hashes[2], 1, 0, true)));
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&hashes[2], 1, 0, true)), 0);
assert_eq!(to_header_vec(result), vec![headers[2].clone()]);
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&hashes[2], 1, 0, false)));
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&hashes[2], 1, 0, false)), 0);
assert_eq!(to_header_vec(result), vec![headers[2].clone()]);
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&hashes[50], 3, 5, false)));
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&hashes[50], 3, 5, false)), 0);
assert_eq!(to_header_vec(result), vec![headers[50].clone(), headers[56].clone(), headers[62].clone()]);
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&hashes[50], 3, 5, true)));
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&hashes[50], 3, 5, true)), 0);
assert_eq!(to_header_vec(result), vec![headers[50].clone(), headers[44].clone(), headers[38].clone()]);
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_num_req(2, 1, 0, true)));
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_num_req(2, 1, 0, true)), 0);
assert_eq!(to_header_vec(result), vec![headers[2].clone()]);
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_num_req(2, 1, 0, false)));
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_num_req(2, 1, 0, false)), 0);
assert_eq!(to_header_vec(result), vec![headers[2].clone()]);
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_num_req(50, 3, 5, false)));
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_num_req(50, 3, 5, false)), 0);
assert_eq!(to_header_vec(result), vec![headers[50].clone(), headers[56].clone(), headers[62].clone()]);
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_num_req(50, 3, 5, true)));
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_num_req(50, 3, 5, true)), 0);
assert_eq!(to_header_vec(result), vec![headers[50].clone(), headers[44].clone(), headers[38].clone()]);
}
@@ -1440,7 +1536,7 @@ mod tests {
fn return_nodes() {
let mut client = TestBlockChainClient::new();
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(H256::new(), &client);
let sync = dummy_sync_with_peer(H256::new(), &client);
let mut io = TestIo::new(&mut client, &mut queue, None);
let mut node_list = RlpStream::new_list(3);
@@ -1450,7 +1546,7 @@ mod tests {
let node_request = node_list.out();
// it returns rlp ONLY for hashes started with "f"
let result = ChainSync::return_node_data(&io, &UntrustedRlp::new(&node_request.clone()));
let result = ChainSync::return_node_data(&io, &UntrustedRlp::new(&node_request.clone()), 0);
assert!(result.is_ok());
let rlp_result = result.unwrap();
@@ -1460,7 +1556,8 @@ mod tests {
assert_eq!(34, rlp_result.unwrap().1.out().len());
io.sender = Some(2usize);
sync.on_packet(&mut io, 0usize, super::GET_NODE_DATA_PACKET, &node_request);
ChainSync::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, super::GET_NODE_DATA_PACKET, &node_request);
assert_eq!(1, io.queue.len());
}