Merge branch 'master' of github.com:ethcore/parity into beta
This commit is contained in:
@@ -1231,6 +1231,14 @@ impl ChainSync {
|
||||
rlp_stream.out()
|
||||
}
|
||||
|
||||
/// creates latest block rlp for the given client
|
||||
fn create_new_block_rlp(chain: &BlockChainClient, hash: &H256) -> Bytes {
|
||||
let mut rlp_stream = RlpStream::new_list(2);
|
||||
rlp_stream.append_raw(&chain.block(BlockID::Hash(hash.clone())).expect("Block has just been sealed; qed"), 1);
|
||||
rlp_stream.append(&chain.block_total_difficulty(BlockID::Hash(hash.clone())).expect("Block has just been sealed; qed."));
|
||||
rlp_stream.out()
|
||||
}
|
||||
|
||||
/// returns peer ids that have less blocks than our chain
|
||||
fn get_lagging_peers(&mut self, chain_info: &BlockChainInfo, io: &SyncIo) -> Vec<(PeerId, BlockNumber)> {
|
||||
let latest_hash = chain_info.best_block_hash;
|
||||
@@ -1250,7 +1258,6 @@ impl ChainSync {
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
|
||||
fn select_lagging_peers(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> Vec<(PeerId, BlockNumber)> {
|
||||
use rand::Rng;
|
||||
let mut lagging_peers = self.get_lagging_peers(chain_info, io);
|
||||
@@ -1263,13 +1270,24 @@ impl ChainSync {
|
||||
}
|
||||
|
||||
/// propagates latest block to lagging peers
|
||||
fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> usize {
|
||||
let lucky_peers = self.select_lagging_peers(chain_info, io);
|
||||
fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo, sealed: &[H256]) -> usize {
|
||||
let lucky_peers: Vec<_> = if sealed.is_empty() {
|
||||
self.select_lagging_peers(chain_info, io).iter().map(|&(id, _)| id).collect()
|
||||
} else {
|
||||
self.peers.keys().cloned().collect()
|
||||
};
|
||||
trace!(target: "sync", "Sending NewBlocks to {:?}", lucky_peers);
|
||||
let mut sent = 0;
|
||||
for (peer_id, _) in lucky_peers {
|
||||
let rlp = ChainSync::create_latest_block_rlp(io.chain());
|
||||
self.send_packet(io, peer_id, NEW_BLOCK_PACKET, rlp);
|
||||
for peer_id in lucky_peers {
|
||||
if sealed.is_empty() {
|
||||
let rlp = ChainSync::create_latest_block_rlp(io.chain());
|
||||
self.send_packet(io, peer_id, NEW_BLOCK_PACKET, rlp);
|
||||
} else {
|
||||
for h in sealed {
|
||||
let rlp = ChainSync::create_new_block_rlp(io.chain(), h);
|
||||
self.send_packet(io, peer_id, NEW_BLOCK_PACKET, rlp);
|
||||
}
|
||||
}
|
||||
self.peers.get_mut(&peer_id).unwrap().latest_hash = chain_info.best_block_hash.clone();
|
||||
self.peers.get_mut(&peer_id).unwrap().latest_number = Some(chain_info.best_block_number);
|
||||
sent += 1;
|
||||
@@ -1346,11 +1364,11 @@ impl ChainSync {
|
||||
sent
|
||||
}
|
||||
|
||||
fn propagate_latest_blocks(&mut self, io: &mut SyncIo) {
|
||||
fn propagate_latest_blocks(&mut self, io: &mut SyncIo, sealed: &[H256]) {
|
||||
let chain_info = io.chain().chain_info();
|
||||
if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION {
|
||||
let hashes = self.propagate_new_hashes(&chain_info, io);
|
||||
let blocks = self.propagate_blocks(&chain_info, io);
|
||||
let blocks = self.propagate_blocks(&chain_info, io, sealed);
|
||||
if blocks != 0 || hashes != 0 {
|
||||
trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes);
|
||||
}
|
||||
@@ -1365,10 +1383,10 @@ impl ChainSync {
|
||||
}
|
||||
|
||||
/// called when block is imported to chain, updates transactions queue and propagates the blocks
|
||||
pub fn chain_new_blocks(&mut self, io: &mut SyncIo, _imported: &[H256], invalid: &[H256], _enacted: &[H256], _retracted: &[H256]) {
|
||||
pub fn chain_new_blocks(&mut self, io: &mut SyncIo, _imported: &[H256], invalid: &[H256], _enacted: &[H256], _retracted: &[H256], sealed: &[H256]) {
|
||||
if io.is_chain_queue_empty() {
|
||||
// Propagate latests blocks
|
||||
self.propagate_latest_blocks(io);
|
||||
self.propagate_latest_blocks(io, sealed);
|
||||
}
|
||||
if !invalid.is_empty() {
|
||||
trace!(target: "sync", "Bad blocks in the queue, restarting");
|
||||
@@ -1637,7 +1655,26 @@ mod tests {
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
|
||||
let chain_info = client.chain_info();
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let peer_count = sync.propagate_blocks(&chain_info, &mut io);
|
||||
let peer_count = sync.propagate_blocks(&chain_info, &mut io, &[]);
|
||||
|
||||
// 1 message should be send
|
||||
assert_eq!(1, io.queue.len());
|
||||
// 1 peer should be updated
|
||||
assert_eq!(1, peer_count);
|
||||
// NEW_BLOCK_PACKET
|
||||
assert_eq!(0x07, io.queue[0].packet_id);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sends_sealed_block() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
client.add_blocks(100, EachBlockWith::Uncle);
|
||||
let mut queue = VecDeque::new();
|
||||
let hash = client.block_hash(BlockID::Number(99)).unwrap();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
|
||||
let chain_info = client.chain_info();
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let peer_count = sync.propagate_blocks(&chain_info, &mut io, &[hash.clone()]);
|
||||
|
||||
// 1 message should be send
|
||||
assert_eq!(1, io.queue.len());
|
||||
@@ -1761,7 +1798,7 @@ mod tests {
|
||||
let chain_info = client.chain_info();
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
|
||||
sync.propagate_blocks(&chain_info, &mut io);
|
||||
sync.propagate_blocks(&chain_info, &mut io, &[]);
|
||||
|
||||
let data = &io.queue[0].data.clone();
|
||||
let result = sync.on_peer_new_block(&mut io, 0, &UntrustedRlp::new(data));
|
||||
@@ -1794,7 +1831,7 @@ mod tests {
|
||||
let mut queue = VecDeque::new();
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
io.chain.miner.chain_new_blocks(io.chain, &[], &[], &[], &good_blocks);
|
||||
sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks);
|
||||
sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks, &[]);
|
||||
assert_eq!(io.chain.miner.status().transactions_in_future_queue, 0);
|
||||
assert_eq!(io.chain.miner.status().transactions_in_pending_queue, 1);
|
||||
}
|
||||
@@ -1808,7 +1845,7 @@ mod tests {
|
||||
let mut queue = VecDeque::new();
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
io.chain.miner.chain_new_blocks(io.chain, &[], &[], &good_blocks, &retracted_blocks);
|
||||
sync.chain_new_blocks(&mut io, &[], &[], &good_blocks, &retracted_blocks);
|
||||
sync.chain_new_blocks(&mut io, &[], &[], &good_blocks, &retracted_blocks, &[]);
|
||||
}
|
||||
|
||||
// then
|
||||
@@ -1833,10 +1870,10 @@ mod tests {
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
|
||||
// when
|
||||
sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks);
|
||||
sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks, &[]);
|
||||
assert_eq!(io.chain.miner.status().transactions_in_future_queue, 0);
|
||||
assert_eq!(io.chain.miner.status().transactions_in_pending_queue, 0);
|
||||
sync.chain_new_blocks(&mut io, &[], &[], &good_blocks, &retracted_blocks);
|
||||
sync.chain_new_blocks(&mut io, &[], &[], &good_blocks, &retracted_blocks, &[]);
|
||||
|
||||
// then
|
||||
let status = io.chain.miner.status();
|
||||
|
||||
@@ -196,9 +196,9 @@ impl NetworkProtocolHandler<SyncMessage> for EthSync {
|
||||
#[cfg_attr(feature="dev", allow(single_match))]
|
||||
fn message(&self, io: &NetworkContext<SyncMessage>, message: &SyncMessage) {
|
||||
match *message {
|
||||
SyncMessage::NewChainBlocks { ref imported, ref invalid, ref enacted, ref retracted } => {
|
||||
SyncMessage::NewChainBlocks { ref imported, ref invalid, ref enacted, ref retracted, ref sealed } => {
|
||||
let mut sync_io = NetSyncIo::new(io, self.chain.deref());
|
||||
self.sync.write().unwrap().chain_new_blocks(&mut sync_io, imported, invalid, enacted, retracted);
|
||||
self.sync.write().unwrap().chain_new_blocks(&mut sync_io, imported, invalid, enacted, retracted, sealed);
|
||||
},
|
||||
_ => {/* Ignore other messages */},
|
||||
}
|
||||
|
||||
@@ -173,6 +173,6 @@ impl TestNet {
|
||||
|
||||
pub fn trigger_chain_new_blocks(&mut self, peer_id: usize) {
|
||||
let mut peer = self.peer_mut(peer_id);
|
||||
peer.sync.write().unwrap().chain_new_blocks(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None), &[], &[], &[], &[]);
|
||||
peer.sync.write().unwrap().chain_new_blocks(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None), &[], &[], &[], &[], &[]);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user