propagade initial
This commit is contained in:
parent
410d6533e0
commit
cc3f712fec
@ -14,4 +14,4 @@ clippy = "0.0.37"
|
|||||||
log = "0.3"
|
log = "0.3"
|
||||||
env_logger = "0.3"
|
env_logger = "0.3"
|
||||||
time = "0.1.34"
|
time = "0.1.34"
|
||||||
|
rand = "0.3.13"
|
||||||
|
@ -46,6 +46,8 @@ const MAX_NODE_DATA_TO_SEND: usize = 1024;
|
|||||||
const MAX_RECEIPTS_TO_SEND: usize = 1024;
|
const MAX_RECEIPTS_TO_SEND: usize = 1024;
|
||||||
const MAX_HEADERS_TO_REQUEST: usize = 512;
|
const MAX_HEADERS_TO_REQUEST: usize = 512;
|
||||||
const MAX_BODIES_TO_REQUEST: usize = 256;
|
const MAX_BODIES_TO_REQUEST: usize = 256;
|
||||||
|
const MIN_PEERS_PROPAGATION: usize = 4;
|
||||||
|
const MAX_PEERS_PROPAGATION: usize = 128;
|
||||||
|
|
||||||
const STATUS_PACKET: u8 = 0x00;
|
const STATUS_PACKET: u8 = 0x00;
|
||||||
const NEW_BLOCK_HASHES_PACKET: u8 = 0x01;
|
const NEW_BLOCK_HASHES_PACKET: u8 = 0x01;
|
||||||
@ -1026,13 +1028,82 @@ impl ChainSync {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/// Maintain other peers. Send out any new blocks and transactions
|
|
||||||
pub fn maintain_sync(&mut self, io: &mut SyncIo) {
|
fn check_resume(&mut self, io: &mut SyncIo) {
|
||||||
if !io.chain().queue_info().full && self.state == SyncState::Waiting {
|
if !io.chain().queue_info().full && self.state == SyncState::Waiting {
|
||||||
self.state = SyncState::Idle;
|
self.state = SyncState::Idle;
|
||||||
self.continue_sync(io);
|
self.continue_sync(io);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn create_new_hashes_rlp(chain: &BlockChainClient, from: &H256, to: &H256) -> Option<Bytes> {
|
||||||
|
match chain.tree_route(from, to) {
|
||||||
|
Some(route) => {
|
||||||
|
match route.blocks.len() {
|
||||||
|
0 => None,
|
||||||
|
_ => {
|
||||||
|
let mut rlp_stream = RlpStream::new_list(route.blocks.len());
|
||||||
|
for hash in route.blocks {
|
||||||
|
rlp_stream.append(&hash);
|
||||||
|
}
|
||||||
|
Some(rlp_stream.out())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
None => None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn query_peer_latest_blocks(&self) -> Vec<(usize, H256)> {
|
||||||
|
self.peers.iter().map(|peer| (peer.0.clone(), peer.1.latest.clone())).collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn propagade_blocks(&mut self, io: &mut SyncIo) -> usize {
|
||||||
|
let updated_peers = {
|
||||||
|
let chain = io.chain();
|
||||||
|
let chain_info = chain.chain_info();
|
||||||
|
let latest_hash = chain_info.best_block_hash;
|
||||||
|
|
||||||
|
let lagging_peers = self.query_peer_latest_blocks().iter().filter(|peer|
|
||||||
|
match io.chain().block_status(&peer.1)
|
||||||
|
{
|
||||||
|
BlockStatus::InChain => peer.1 != latest_hash,
|
||||||
|
_ => false
|
||||||
|
}).cloned().collect::<Vec<(usize, H256)>>();
|
||||||
|
|
||||||
|
let lucky_peers = match lagging_peers.len() {
|
||||||
|
0 ... MIN_PEERS_PROPAGATION => lagging_peers,
|
||||||
|
_ => lagging_peers.iter().filter(|_| ::rand::random::<u8>() < 64u8).cloned().collect::<Vec<(usize, H256)>>()
|
||||||
|
};
|
||||||
|
|
||||||
|
match lucky_peers.len() {
|
||||||
|
0 ... MAX_PEERS_PROPAGATION => lucky_peers,
|
||||||
|
_ => lucky_peers.iter().take(MAX_PEERS_PROPAGATION).cloned().collect::<Vec<(usize, H256)>>()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut sent = 0;
|
||||||
|
for (peer_id, peer_hash) in updated_peers {
|
||||||
|
sent = sent + match ChainSync::create_new_hashes_rlp(io.chain(), &peer_hash, &io.chain().chain_info().best_block_hash) {
|
||||||
|
Some(rlp) => {
|
||||||
|
self.send_request(io, peer_id, PeerAsking::Nothing, NEW_BLOCK_HASHES_PACKET, rlp);
|
||||||
|
1
|
||||||
|
},
|
||||||
|
None => 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sent
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Maintain other peers. Send out any new blocks and transactions
|
||||||
|
pub fn maintain_sync(&mut self, io: &mut SyncIo) {
|
||||||
|
self.check_resume(io);
|
||||||
|
|
||||||
|
if self.state == SyncState::Idle {
|
||||||
|
let blocks_propagaded = self.propagade_blocks(io);
|
||||||
|
debug!(target: "sync", "Sent new blocks to peers: {:?}", blocks_propagaded);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
@ -34,6 +34,7 @@ extern crate ethcore_util as util;
|
|||||||
extern crate ethcore;
|
extern crate ethcore;
|
||||||
extern crate env_logger;
|
extern crate env_logger;
|
||||||
extern crate time;
|
extern crate time;
|
||||||
|
extern crate rand;
|
||||||
|
|
||||||
use std::ops::*;
|
use std::ops::*;
|
||||||
use std::sync::*;
|
use std::sync::*;
|
||||||
|
@ -89,3 +89,31 @@ fn status_empty() {
|
|||||||
let net = TestNet::new(2);
|
let net = TestNet::new(2);
|
||||||
assert_eq!(net.peer(0).sync.status().state, SyncState::NotSynced);
|
assert_eq!(net.peer(0).sync.status().state, SyncState::NotSynced);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn status_packet() {
|
||||||
|
let mut net = TestNet::new(2);
|
||||||
|
net.peer_mut(0).chain.add_blocks(1000, false);
|
||||||
|
net.peer_mut(1).chain.add_blocks(1, false);
|
||||||
|
|
||||||
|
net.start();
|
||||||
|
|
||||||
|
net.sync_step_peer(0);
|
||||||
|
|
||||||
|
assert_eq!(1, net.peer(0).queue.len());
|
||||||
|
assert_eq!(0x00, net.peer(0).queue[0].packet_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn propagade() {
|
||||||
|
let mut net = TestNet::new(2);
|
||||||
|
net.peer_mut(0).chain.add_blocks(100, false);
|
||||||
|
net.peer_mut(1).chain.add_blocks(100, false);
|
||||||
|
net.sync();
|
||||||
|
|
||||||
|
net.peer_mut(0).chain.add_blocks(10, false);
|
||||||
|
net.sync_step_peer(0);
|
||||||
|
|
||||||
|
assert_eq!(1, net.peer(0).queue.len());
|
||||||
|
assert_eq!(0x01, net.peer(0).queue[0].packet_id);
|
||||||
|
}
|
@ -318,6 +318,11 @@ impl TestNet {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn sync_step_peer(&mut self, peer_num: usize) {
|
||||||
|
let mut peer = self.peer_mut(peer_num);
|
||||||
|
peer.sync.maintain_sync(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None));
|
||||||
|
}
|
||||||
|
|
||||||
pub fn restart_peer(&mut self, i: usize) {
|
pub fn restart_peer(&mut self, i: usize) {
|
||||||
let peer = self.peer_mut(i);
|
let peer = self.peer_mut(i);
|
||||||
peer.sync.restart(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None));
|
peer.sync.restart(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None));
|
||||||
|
Loading…
Reference in New Issue
Block a user