Merge pull request #1164 from ethcore/sync

Sync fixes and tweaks
This commit is contained in:
Arkadiy Paronyan 2016-05-30 22:29:29 +02:00
commit 6d25e7f8b4
4 changed files with 46 additions and 45 deletions

View File

@ -254,7 +254,7 @@ impl<V> Client<V> where V: Verifier {
/// This is triggered by a message coming from a block queue when the block is ready for insertion /// This is triggered by a message coming from a block queue when the block is ready for insertion
pub fn import_verified_blocks(&self, io: &IoChannel<NetSyncMessage>) -> usize { pub fn import_verified_blocks(&self, io: &IoChannel<NetSyncMessage>) -> usize {
let max_blocks_to_import = 128; let max_blocks_to_import = 64;
let mut imported_blocks = Vec::with_capacity(max_blocks_to_import); let mut imported_blocks = Vec::with_capacity(max_blocks_to_import);
let mut invalid_blocks = HashSet::new(); let mut invalid_blocks = HashSet::new();

View File

@ -115,7 +115,7 @@ fn can_collect_garbage() {
fn can_handle_long_fork() { fn can_handle_long_fork() {
let client_result = generate_dummy_client(1200); let client_result = generate_dummy_client(1200);
let client = client_result.reference(); let client = client_result.reference();
for _ in 0..10 { for _ in 0..20 {
client.import_verified_blocks(&IoChannel::disconnected()); client.import_verified_blocks(&IoChannel::disconnected());
} }
assert_eq!(1200, client.chain_info().best_block_number); assert_eq!(1200, client.chain_info().best_block_number);
@ -124,7 +124,7 @@ fn can_handle_long_fork() {
push_blocks_to_client(client, 49, 1201, 800); push_blocks_to_client(client, 49, 1201, 800);
push_blocks_to_client(client, 53, 1201, 600); push_blocks_to_client(client, 53, 1201, 600);
for _ in 0..20 { for _ in 0..40 {
client.import_verified_blocks(&IoChannel::disconnected()); client.import_verified_blocks(&IoChannel::disconnected());
} }
assert_eq!(2000, client.chain_info().best_block_number); assert_eq!(2000, client.chain_info().best_block_number);

View File

@ -113,11 +113,12 @@ const MAX_HEADERS_TO_SEND: usize = 512;
const MAX_NODE_DATA_TO_SEND: usize = 1024; const MAX_NODE_DATA_TO_SEND: usize = 1024;
const MAX_RECEIPTS_TO_SEND: usize = 1024; const MAX_RECEIPTS_TO_SEND: usize = 1024;
const MAX_RECEIPTS_HEADERS_TO_SEND: usize = 256; const MAX_RECEIPTS_HEADERS_TO_SEND: usize = 256;
const MAX_HEADERS_TO_REQUEST: usize = 256; const MAX_HEADERS_TO_REQUEST: usize = 128;
const MAX_BODIES_TO_REQUEST: usize = 64; const MAX_BODIES_TO_REQUEST: usize = 64;
const MIN_PEERS_PROPAGATION: usize = 4; const MIN_PEERS_PROPAGATION: usize = 4;
const MAX_PEERS_PROPAGATION: usize = 128; const MAX_PEERS_PROPAGATION: usize = 128;
const MAX_PEER_LAG_PROPAGATION: BlockNumber = 20; const MAX_PEER_LAG_PROPAGATION: BlockNumber = 20;
const SUBCHAIN_SIZE: usize = 64;
const STATUS_PACKET: u8 = 0x00; const STATUS_PACKET: u8 = 0x00;
const NEW_BLOCK_HASHES_PACKET: u8 = 0x01; const NEW_BLOCK_HASHES_PACKET: u8 = 0x01;
@ -133,7 +134,7 @@ const NODE_DATA_PACKET: u8 = 0x0e;
const GET_RECEIPTS_PACKET: u8 = 0x0f; const GET_RECEIPTS_PACKET: u8 = 0x0f;
const RECEIPTS_PACKET: u8 = 0x10; const RECEIPTS_PACKET: u8 = 0x10;
const CONNECTION_TIMEOUT_SEC: f64 = 10f64; const CONNECTION_TIMEOUT_SEC: f64 = 15f64;
#[derive(Copy, Clone, Eq, PartialEq, Debug)] #[derive(Copy, Clone, Eq, PartialEq, Debug)]
/// Sync state /// Sync state
@ -639,7 +640,7 @@ impl ChainSync {
self.sync_peer(io, p, false); self.sync_peer(io, p, false);
} }
} }
if !self.peers.values().any(|p| p.asking != PeerAsking::Nothing) { if self.state != SyncState::Waiting && !self.peers.values().any(|p| p.asking != PeerAsking::Nothing) {
self.complete_sync(); self.complete_sync();
} }
} }
@ -665,7 +666,7 @@ impl ChainSync {
return; return;
} }
if self.state == SyncState::Waiting { if self.state == SyncState::Waiting {
trace!(target: "sync", "Waiting for block queue"); trace!(target: "sync", "Waiting for the block queue");
return; return;
} }
(peer.latest_hash.clone(), peer.difficulty.clone()) (peer.latest_hash.clone(), peer.difficulty.clone())
@ -689,7 +690,7 @@ impl ChainSync {
// Request subchain headers // Request subchain headers
trace!(target: "sync", "Starting sync with better chain"); trace!(target: "sync", "Starting sync with better chain");
let last = self.last_imported_hash.clone(); let last = self.last_imported_hash.clone();
self.request_headers_by_hash(io, peer_id, &last, 128, 255, false, PeerAsking::Heads); self.request_headers_by_hash(io, peer_id, &last, SUBCHAIN_SIZE, MAX_HEADERS_TO_REQUEST - 1, false, PeerAsking::Heads);
}, },
SyncState::Blocks | SyncState::NewBlocks => { SyncState::Blocks | SyncState::NewBlocks => {
if io.chain().block_status(BlockID::Hash(peer_latest)) == BlockStatus::Unknown { if io.chain().block_status(BlockID::Hash(peer_latest)) == BlockStatus::Unknown {
@ -704,6 +705,8 @@ impl ChainSync {
fn start_sync_round(&mut self, io: &mut SyncIo) { fn start_sync_round(&mut self, io: &mut SyncIo) {
self.state = SyncState::ChainHead; self.state = SyncState::ChainHead;
trace!(target: "sync", "Starting round (last imported count = {:?}, block = {:?}", self.imported_this_round, self.last_imported_block); trace!(target: "sync", "Starting round (last imported count = {:?}, block = {:?}", self.imported_this_round, self.last_imported_block);
// Check if need to retract to find the common block. The problem is that the peers still return headers by hash even
// from the non-canonical part of the tree. So we also retract if nothing has been imported last round.
if self.imported_this_round.is_some() && self.imported_this_round.unwrap() == 0 && self.last_imported_block > 0 { if self.imported_this_round.is_some() && self.imported_this_round.unwrap() == 0 && self.last_imported_block > 0 {
match io.chain().block_hash(BlockID::Number(self.last_imported_block - 1)) { match io.chain().block_hash(BlockID::Number(self.last_imported_block - 1)) {
Some(h) => { Some(h) => {
@ -781,9 +784,13 @@ impl ChainSync {
match io.chain().import_block(block) { match io.chain().import_block(block) {
Err(Error::Import(ImportError::AlreadyInChain)) => { Err(Error::Import(ImportError::AlreadyInChain)) => {
self.last_imported_block = number;
self.last_imported_hash = h.clone();
trace!(target: "sync", "Block already in chain {:?}", h); trace!(target: "sync", "Block already in chain {:?}", h);
}, },
Err(Error::Import(ImportError::AlreadyQueued)) => { Err(Error::Import(ImportError::AlreadyQueued)) => {
self.last_imported_block = number;
self.last_imported_hash = h.clone();
trace!(target: "sync", "Block already queued {:?}", h); trace!(target: "sync", "Block already queued {:?}", h);
}, },
Ok(_) => { Ok(_) => {
@ -856,22 +863,15 @@ impl ChainSync {
/// Generic request sender /// Generic request sender
fn send_request(&mut self, sync: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, packet_id: PacketId, packet: Bytes) { fn send_request(&mut self, sync: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, packet_id: PacketId, packet: Bytes) {
{
let peer = self.peers.get_mut(&peer_id).unwrap(); let peer = self.peers.get_mut(&peer_id).unwrap();
if peer.asking != PeerAsking::Nothing { if peer.asking != PeerAsking::Nothing {
warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking); warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking);
} }
}
match sync.send(peer_id, packet_id, packet) {
Err(e) => {
debug!(target:"sync", "Error sending request: {:?}", e);
sync.disable_peer(peer_id);
}
Ok(_) => {
let mut peer = self.peers.get_mut(&peer_id).unwrap();
peer.asking = asking; peer.asking = asking;
peer.ask_time = time::precise_time_s(); peer.ask_time = time::precise_time_s();
} if let Err(e) = sync.send(peer_id, packet_id, packet) {
debug!(target:"sync", "Error sending request: {:?}", e);
sync.disable_peer(peer_id);
} }
} }
@ -1099,6 +1099,7 @@ impl ChainSync {
let tick = time::precise_time_s(); let tick = time::precise_time_s();
for (peer_id, peer) in &self.peers { for (peer_id, peer) in &self.peers {
if peer.asking != PeerAsking::Nothing && (tick - peer.ask_time) > CONNECTION_TIMEOUT_SEC { if peer.asking != PeerAsking::Nothing && (tick - peer.ask_time) > CONNECTION_TIMEOUT_SEC {
trace!(target:"sync", "Timeouted {}", peer_id);
io.disconnect_peer(*peer_id); io.disconnect_peer(*peer_id);
} }
} }
@ -1164,24 +1165,23 @@ impl ChainSync {
.collect::<Vec<_>>() .collect::<Vec<_>>()
} }
fn select_lagging_peers(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> Vec<(PeerId, BlockNumber)> {
use rand::Rng;
let mut lagging_peers = self.get_lagging_peers(chain_info, io);
// take sqrt(x) peers
let mut count = (self.peers.len() as f64).powf(0.5).round() as usize;
count = min(count, MAX_PEERS_PROPAGATION);
count = max(count, MIN_PEERS_PROPAGATION);
::rand::thread_rng().shuffle(&mut lagging_peers);
lagging_peers.into_iter().take(count).collect::<Vec<_>>()
}
/// propagates latest block to lagging peers /// propagates latest block to lagging peers
fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> usize { fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> usize {
let updated_peers = { let lucky_peers = self.select_lagging_peers(chain_info, io);
let lagging_peers = self.get_lagging_peers(chain_info, io);
// sqrt(x)/x scaled to max u32
let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32;
let lucky_peers = match lagging_peers.len() {
0 ... MIN_PEERS_PROPAGATION => lagging_peers,
_ => lagging_peers.into_iter().filter(|_| ::rand::random::<u32>() < fraction).collect::<Vec<_>>()
};
// taking at max of MAX_PEERS_PROPAGATION
lucky_peers.iter().map(|&(id, _)| id.clone()).take(min(lucky_peers.len(), MAX_PEERS_PROPAGATION)).collect::<Vec<PeerId>>()
};
let mut sent = 0; let mut sent = 0;
for peer_id in updated_peers { for (peer_id, _) in lucky_peers {
let rlp = ChainSync::create_latest_block_rlp(io.chain()); let rlp = ChainSync::create_latest_block_rlp(io.chain());
self.send_packet(io, peer_id, NEW_BLOCK_PACKET, rlp); self.send_packet(io, peer_id, NEW_BLOCK_PACKET, rlp);
self.peers.get_mut(&peer_id).unwrap().latest_hash = chain_info.best_block_hash.clone(); self.peers.get_mut(&peer_id).unwrap().latest_hash = chain_info.best_block_hash.clone();
@ -1193,12 +1193,12 @@ impl ChainSync {
/// propagates new known hashes to all peers /// propagates new known hashes to all peers
fn propagate_new_hashes(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> usize { fn propagate_new_hashes(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> usize {
let updated_peers = self.get_lagging_peers(chain_info, io); let lucky_peers = self.select_lagging_peers(chain_info, io);
let mut sent = 0; let mut sent = 0;
let last_parent = HeaderView::new(&io.chain().block_header(BlockID::Hash(chain_info.best_block_hash.clone())).unwrap()).parent_hash(); let last_parent = HeaderView::new(&io.chain().block_header(BlockID::Hash(chain_info.best_block_hash.clone())).unwrap()).parent_hash();
for (peer_id, peer_number) in updated_peers { for (peer_id, peer_number) in lucky_peers {
let mut peer_best = self.peers.get(&peer_id).unwrap().latest_hash.clone(); let mut peer_best = self.peers.get(&peer_id).unwrap().latest_hash.clone();
if chain_info.best_block_number - peer_number > MAX_PEERS_PROPAGATION as BlockNumber { if chain_info.best_block_number - peer_number > MAX_PEER_LAG_PROPAGATION as BlockNumber {
// If we think peer is too far behind just send one latest hash // If we think peer is too far behind just send one latest hash
peer_best = last_parent.clone(); peer_best = last_parent.clone();
} }
@ -1259,15 +1259,15 @@ impl ChainSync {
} }
fn propagate_latest_blocks(&mut self, io: &mut SyncIo) { fn propagate_latest_blocks(&mut self, io: &mut SyncIo) {
self.propagate_new_transactions(io);
let chain_info = io.chain().chain_info(); let chain_info = io.chain().chain_info();
if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION { if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION {
let blocks = self.propagate_blocks(&chain_info, io);
let hashes = self.propagate_new_hashes(&chain_info, io); let hashes = self.propagate_new_hashes(&chain_info, io);
let blocks = self.propagate_blocks(&chain_info, io);
if blocks != 0 || hashes != 0 { if blocks != 0 || hashes != 0 {
trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes); trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes);
} }
} }
self.propagate_new_transactions(io);
self.last_sent_block_number = chain_info.best_block_number; self.last_sent_block_number = chain_info.best_block_number;
} }

View File

@ -159,7 +159,7 @@ fn propagate_hashes() {
#[test] #[test]
fn propagate_blocks() { fn propagate_blocks() {
let mut net = TestNet::new(2); let mut net = TestNet::new(20);
net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle); net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle);
net.sync(); net.sync();
@ -169,7 +169,8 @@ fn propagate_blocks() {
assert!(!net.peer(0).queue.is_empty()); assert!(!net.peer(0).queue.is_empty());
// NEW_BLOCK_PACKET // NEW_BLOCK_PACKET
assert_eq!(0x07, net.peer(0).queue[0].packet_id); let blocks = net.peer(0).queue.iter().filter(|p| p.packet_id == 0x7).count();
assert!(blocks > 0);
} }
#[test] #[test]