many fixes
This commit is contained in:
parent
b01f954b05
commit
b606df451e
@ -64,6 +64,7 @@ const MAX_HEADERS_TO_REQUEST: usize = 512;
|
|||||||
const MAX_BODIES_TO_REQUEST: usize = 256;
|
const MAX_BODIES_TO_REQUEST: usize = 256;
|
||||||
const MIN_PEERS_PROPAGATION: usize = 4;
|
const MIN_PEERS_PROPAGATION: usize = 4;
|
||||||
const MAX_PEERS_PROPAGATION: usize = 128;
|
const MAX_PEERS_PROPAGATION: usize = 128;
|
||||||
|
const MAX_PEER_LAG_PROPAGATION: BlockNumber = 20;
|
||||||
|
|
||||||
const STATUS_PACKET: u8 = 0x00;
|
const STATUS_PACKET: u8 = 0x00;
|
||||||
const NEW_BLOCK_HASHES_PACKET: u8 = 0x01;
|
const NEW_BLOCK_HASHES_PACKET: u8 = 0x01;
|
||||||
@ -136,7 +137,7 @@ pub struct SyncStatus {
|
|||||||
pub num_active_peers: usize,
|
pub num_active_peers: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(PartialEq, Eq, Debug)]
|
#[derive(PartialEq, Eq, Debug, Clone)]
|
||||||
/// Peer data type requested
|
/// Peer data type requested
|
||||||
enum PeerAsking {
|
enum PeerAsking {
|
||||||
Nothing,
|
Nothing,
|
||||||
@ -144,6 +145,7 @@ enum PeerAsking {
|
|||||||
BlockBodies,
|
BlockBodies,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
/// Syncing peer information
|
/// Syncing peer information
|
||||||
struct PeerInfo {
|
struct PeerInfo {
|
||||||
/// eth protocol version
|
/// eth protocol version
|
||||||
@ -162,6 +164,8 @@ struct PeerInfo {
|
|||||||
asking_blocks: Vec<BlockNumber>,
|
asking_blocks: Vec<BlockNumber>,
|
||||||
/// Request timestamp
|
/// Request timestamp
|
||||||
ask_time: f64,
|
ask_time: f64,
|
||||||
|
/// Latest block number
|
||||||
|
latest_number: BlockNumber
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Blockchain sync handler.
|
/// Blockchain sync handler.
|
||||||
@ -267,7 +271,7 @@ impl ChainSync {
|
|||||||
|
|
||||||
/// Called by peer to report status
|
/// Called by peer to report status
|
||||||
fn on_peer_status(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
|
fn on_peer_status(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
|
||||||
let peer = PeerInfo {
|
let mut peer = PeerInfo {
|
||||||
protocol_version: try!(r.val_at(0)),
|
protocol_version: try!(r.val_at(0)),
|
||||||
network_id: try!(r.val_at(1)),
|
network_id: try!(r.val_at(1)),
|
||||||
difficulty: try!(r.val_at(2)),
|
difficulty: try!(r.val_at(2)),
|
||||||
@ -276,8 +280,13 @@ impl ChainSync {
|
|||||||
asking: PeerAsking::Nothing,
|
asking: PeerAsking::Nothing,
|
||||||
asking_blocks: Vec::new(),
|
asking_blocks: Vec::new(),
|
||||||
ask_time: 0f64,
|
ask_time: 0f64,
|
||||||
|
latest_number: 0,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if io.chain().block_status(&peer.latest) == BlockStatus::InChain {
|
||||||
|
peer.latest_number = HeaderView::new(&io.chain().block_header(&peer.latest).unwrap()).number();
|
||||||
|
}
|
||||||
|
|
||||||
trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest, peer.genesis);
|
trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest, peer.genesis);
|
||||||
|
|
||||||
let chain_info = io.chain().chain_info();
|
let chain_info = io.chain().chain_info();
|
||||||
@ -441,6 +450,8 @@ impl ChainSync {
|
|||||||
match io.chain().import_block(block_rlp.as_raw().to_vec()) {
|
match io.chain().import_block(block_rlp.as_raw().to_vec()) {
|
||||||
Err(ImportError::AlreadyInChain) => {
|
Err(ImportError::AlreadyInChain) => {
|
||||||
trace!(target: "sync", "New block already in chain {:?}", h);
|
trace!(target: "sync", "New block already in chain {:?}", h);
|
||||||
|
let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer");
|
||||||
|
peer.latest_number = max(peer.latest_number, header_view.number());
|
||||||
},
|
},
|
||||||
Err(ImportError::AlreadyQueued) => {
|
Err(ImportError::AlreadyQueued) => {
|
||||||
trace!(target: "sync", "New block already queued {:?}", h);
|
trace!(target: "sync", "New block already queued {:?}", h);
|
||||||
@ -471,6 +482,7 @@ impl ChainSync {
|
|||||||
{
|
{
|
||||||
let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer");
|
let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer");
|
||||||
peer.latest = header_view.sha3();
|
peer.latest = header_view.sha3();
|
||||||
|
peer.latest_number = header_view.number();
|
||||||
}
|
}
|
||||||
self.sync_peer(io, peer_id, true);
|
self.sync_peer(io, peer_id, true);
|
||||||
}
|
}
|
||||||
@ -638,6 +650,7 @@ impl ChainSync {
|
|||||||
if start == 0 {
|
if start == 0 {
|
||||||
self.have_common_block = true; //reached genesis
|
self.have_common_block = true; //reached genesis
|
||||||
self.last_imported_hash = Some(chain_info.genesis_hash);
|
self.last_imported_hash = Some(chain_info.genesis_hash);
|
||||||
|
self.last_imported_block = Some(0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if self.have_common_block {
|
if self.have_common_block {
|
||||||
@ -1032,10 +1045,6 @@ impl ChainSync {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Maintain other peers. Send out any new blocks and transactions
|
|
||||||
pub fn _maintain_sync(&mut self, _io: &mut SyncIo) {
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn maintain_peers(&self, io: &mut SyncIo) {
|
pub fn maintain_peers(&self, io: &mut SyncIo) {
|
||||||
let tick = time::precise_time_s();
|
let tick = time::precise_time_s();
|
||||||
for (peer_id, peer) in &self.peers {
|
for (peer_id, peer) in &self.peers {
|
||||||
@ -1070,41 +1079,39 @@ impl ChainSync {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn query_peer_latest_blocks(&self) -> Vec<(usize, H256)> {
|
fn get_lagging_peers(&self, io: &SyncIo) -> Vec<usize> {
|
||||||
self.peers.iter().map(|peer| (peer.0.clone(), peer.1.latest.clone())).collect()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_lagging_peers(&self, io: &SyncIo) -> Vec<(usize, H256)> {
|
|
||||||
let chain = io.chain();
|
let chain = io.chain();
|
||||||
let chain_info = chain.chain_info();
|
let chain_info = chain.chain_info();
|
||||||
let latest_hash = chain_info.best_block_hash;
|
let latest_hash = chain_info.best_block_hash;
|
||||||
self.query_peer_latest_blocks().iter().filter(|peer|
|
let latest_number = chain_info.best_block_number;
|
||||||
match io.chain().block_status(&peer.1)
|
self.peers.iter().filter(|&(peer_id, peer_info)|
|
||||||
|
match io.chain().block_status(&peer_info.latest)
|
||||||
{
|
{
|
||||||
BlockStatus::InChain => peer.1 != latest_hash,
|
BlockStatus::InChain => peer_info.latest != latest_hash && latest_number - peer_info.latest_number < MAX_PEER_LAG_PROPAGATION,
|
||||||
_ => false
|
_ => false
|
||||||
}).cloned().collect::<Vec<(usize, H256)>>()
|
})
|
||||||
|
.map(|(peer_id, peer_info)| peer_id)
|
||||||
|
.cloned().collect::<Vec<usize>>()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn propagade_blocks(&mut self, io: &mut SyncIo) -> usize {
|
fn propagade_blocks(&mut self, io: &mut SyncIo) -> usize {
|
||||||
let updated_peers = {
|
let updated_peers = {
|
||||||
|
|
||||||
let lagging_peers = self.get_lagging_peers(io);
|
let lagging_peers = self.get_lagging_peers(io);
|
||||||
|
|
||||||
let lucky_peers = match lagging_peers.len() {
|
// sqrt(x)/x scaled to max u32
|
||||||
|
let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32;
|
||||||
|
let mut lucky_peers = match lagging_peers.len() {
|
||||||
0 ... MIN_PEERS_PROPAGATION => lagging_peers,
|
0 ... MIN_PEERS_PROPAGATION => lagging_peers,
|
||||||
_ => lagging_peers.iter().filter(|_| ::rand::random::<u8>() < 64u8).cloned().collect::<Vec<(usize, H256)>>()
|
_ => lagging_peers.iter().filter(|_| ::rand::random::<u32>() < fraction).cloned().collect::<Vec<usize>>()
|
||||||
};
|
};
|
||||||
|
|
||||||
match lucky_peers.len() {
|
// taking at max of MAX_PEERS_PROPAGATION
|
||||||
0 ... MAX_PEERS_PROPAGATION => lucky_peers,
|
lucky_peers.iter().take(min(lucky_peers.len(), MAX_PEERS_PROPAGATION)).cloned().collect::<Vec<usize>>()
|
||||||
_ => lucky_peers.iter().take(MAX_PEERS_PROPAGATION).cloned().collect::<Vec<(usize, H256)>>()
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut sent = 0;
|
let mut sent = 0;
|
||||||
for (peer_id, peer_hash) in updated_peers {
|
for peer_id in updated_peers {
|
||||||
sent = sent + match ChainSync::create_new_hashes_rlp(io.chain(), &peer_hash, &io.chain().chain_info().best_block_hash) {
|
sent = sent + match ChainSync::create_new_hashes_rlp(io.chain(), &self.peers[&peer_id].latest, &io.chain().chain_info().best_block_hash) {
|
||||||
Some(rlp) => {
|
Some(rlp) => {
|
||||||
self.send_request(io, peer_id, PeerAsking::Nothing, NEW_BLOCK_HASHES_PACKET, rlp);
|
self.send_request(io, peer_id, PeerAsking::Nothing, NEW_BLOCK_HASHES_PACKET, rlp);
|
||||||
1
|
1
|
||||||
@ -1124,6 +1131,16 @@ impl ChainSync {
|
|||||||
trace!(target: "sync", "Sent new blocks to peers: {:?}", blocks_propagaded);
|
trace!(target: "sync", "Sent new blocks to peers: {:?}", blocks_propagaded);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub fn get_peer_latet(&self, peer_id: usize) -> H256 {
|
||||||
|
self.peers[&peer_id].latest.clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub fn get_peer_latest_number(&self, peer_id: usize) -> BlockNumber {
|
||||||
|
self.peers[&peer_id].latest_number
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@ -1210,6 +1227,7 @@ mod tests {
|
|||||||
genesis: H256::zero(),
|
genesis: H256::zero(),
|
||||||
network_id: U256::zero(),
|
network_id: U256::zero(),
|
||||||
latest: peer_latest_hash,
|
latest: peer_latest_hash,
|
||||||
|
latest_number: 90,
|
||||||
difficulty: U256::zero(),
|
difficulty: U256::zero(),
|
||||||
asking: PeerAsking::Nothing,
|
asking: PeerAsking::Nothing,
|
||||||
asking_blocks: Vec::<BlockNumber>::new(),
|
asking_blocks: Vec::<BlockNumber>::new(),
|
||||||
@ -1251,17 +1269,17 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn sends_packet_to_lagging_peer() {
|
fn sends_packet_to_lagging_peer() {
|
||||||
let mut client = TestBlockChainClient::new();
|
let mut client = TestBlockChainClient::new();
|
||||||
client.add_blocks(20, false);
|
client.add_blocks(100, false);
|
||||||
let mut queue = VecDeque::new();
|
let mut queue = VecDeque::new();
|
||||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
|
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
|
||||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||||
|
|
||||||
let block_count = sync.propagade_blocks(&mut io);
|
let peer_count = sync.propagade_blocks(&mut io);
|
||||||
|
|
||||||
// 1 message should be send
|
// 1 message should be send
|
||||||
assert_eq!(1, io.queue.len());
|
assert_eq!(1, io.queue.len());
|
||||||
// 1 peer should be updated
|
// 1 peer should be updated
|
||||||
assert_eq!(1, block_count);
|
assert_eq!(1, peer_count);
|
||||||
// NEW_BLOCK_HASHES_PACKET
|
// NEW_BLOCK_HASHES_PACKET
|
||||||
assert_eq!(0x01, io.queue[0].packet_id);
|
assert_eq!(0x01, io.queue[0].packet_id);
|
||||||
}
|
}
|
||||||
|
@ -126,10 +126,18 @@ fn propagade() {
|
|||||||
net.peer_mut(1).chain.add_blocks(1000, false);
|
net.peer_mut(1).chain.add_blocks(1000, false);
|
||||||
net.peer_mut(2).chain.add_blocks(1000, false);
|
net.peer_mut(2).chain.add_blocks(1000, false);
|
||||||
net.sync();
|
net.sync();
|
||||||
|
|
||||||
|
|
||||||
let status = net.peer(0).sync.status();
|
let status = net.peer(0).sync.status();
|
||||||
assert_eq!(status.state, SyncState::Idle);
|
assert_eq!(status.state, SyncState::Idle);
|
||||||
|
|
||||||
net.peer_mut(0).chain.add_blocks(10, false);
|
net.peer_mut(0).chain.add_blocks(10, false);
|
||||||
|
assert_eq!(1010, net.peer(0).chain.chain_info().best_block_number);
|
||||||
|
assert_eq!(1000, net.peer(1).chain.chain_info().best_block_number);
|
||||||
|
assert_eq!(1000, net.peer(2).chain.chain_info().best_block_number);
|
||||||
|
|
||||||
|
assert_eq!(net.peer(0).sync.get_peer_latest_number(1), 1000);
|
||||||
|
|
||||||
net.sync_step_peer(0);
|
net.sync_step_peer(0);
|
||||||
|
|
||||||
// 2 peers to sync
|
// 2 peers to sync
|
||||||
|
Loading…
Reference in New Issue
Block a user