From f85e409ff7f68b3d8061206afc8f7f914c778f7e Mon Sep 17 00:00:00 2001 From: arkpar Date: Fri, 27 May 2016 16:35:52 +0200 Subject: [PATCH 1/5] Make sure downloaded blocks are unmarked on send error --- sync/src/chain.rs | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/sync/src/chain.rs b/sync/src/chain.rs index e66fce0d5..6a54c648f 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -862,16 +862,12 @@ impl ChainSync { warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking); } } - match sync.send(peer_id, packet_id, packet) { - Err(e) => { - debug!(target:"sync", "Error sending request: {:?}", e); - sync.disable_peer(peer_id); - } - Ok(_) => { - let mut peer = self.peers.get_mut(&peer_id).unwrap(); - peer.asking = asking; - peer.ask_time = time::precise_time_s(); - } + let mut peer = self.peers.get_mut(&peer_id).unwrap(); + peer.asking = asking; + peer.ask_time = time::precise_time_s(); + if let Err(e) = sync.send(peer_id, packet_id, packet) { + debug!(target:"sync", "Error sending request: {:?}", e); + sync.disable_peer(peer_id); } } From d1fc5a561180576cc865707982f0b305620f3335 Mon Sep 17 00:00:00 2001 From: arkpar Date: Fri, 27 May 2016 16:49:29 +0200 Subject: [PATCH 2/5] Tweaked some constansts for slower machines --- ethcore/src/client/client.rs | 2 +- sync/src/chain.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 13d678c14..ab1c21af1 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -254,7 +254,7 @@ impl Client where V: Verifier { /// This is triggered by a message coming from a block queue when the block is ready for insertion pub fn import_verified_blocks(&self, io: &IoChannel) -> usize { - let max_blocks_to_import = 128; + let max_blocks_to_import = 64; let mut imported_blocks = Vec::with_capacity(max_blocks_to_import); let mut invalid_blocks = HashSet::new(); diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 6a54c648f..dde287a57 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -133,7 +133,7 @@ const NODE_DATA_PACKET: u8 = 0x0e; const GET_RECEIPTS_PACKET: u8 = 0x0f; const RECEIPTS_PACKET: u8 = 0x10; -const CONNECTION_TIMEOUT_SEC: f64 = 10f64; +const CONNECTION_TIMEOUT_SEC: f64 = 15f64; #[derive(Copy, Clone, Eq, PartialEq, Debug)] /// Sync state From 0e905a06d937a745435a6fe0a50d1ba873874c5d Mon Sep 17 00:00:00 2001 From: arkpar Date: Fri, 27 May 2016 17:26:50 +0200 Subject: [PATCH 3/5] Tweaked propagation order --- sync/src/chain.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sync/src/chain.rs b/sync/src/chain.rs index dde287a57..63ab8bcc8 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -1255,15 +1255,15 @@ impl ChainSync { } fn propagate_latest_blocks(&mut self, io: &mut SyncIo) { - self.propagate_new_transactions(io); let chain_info = io.chain().chain_info(); if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION { - let blocks = self.propagate_blocks(&chain_info, io); let hashes = self.propagate_new_hashes(&chain_info, io); + let blocks = self.propagate_blocks(&chain_info, io); if blocks != 0 || hashes != 0 { trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes); } } + self.propagate_new_transactions(io); self.last_sent_block_number = chain_info.best_block_number; } From 1e8bf8c89df7b9db69726441a92d020f49d45cb7 Mon Sep 17 00:00:00 2001 From: arkpar Date: Fri, 27 May 2016 18:57:12 +0200 Subject: [PATCH 4/5] More tweaks --- ethcore/src/tests/client.rs | 4 ++-- sync/src/chain.rs | 25 +++++++++++++++---------- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/ethcore/src/tests/client.rs b/ethcore/src/tests/client.rs index ee2cb9c3c..d734b0b47 100644 --- a/ethcore/src/tests/client.rs +++ b/ethcore/src/tests/client.rs @@ -115,7 +115,7 @@ fn can_collect_garbage() { fn can_handle_long_fork() { let client_result = generate_dummy_client(1200); let client = client_result.reference(); - for _ in 0..10 { + for _ in 0..20 { client.import_verified_blocks(&IoChannel::disconnected()); } assert_eq!(1200, client.chain_info().best_block_number); @@ -124,7 +124,7 @@ fn can_handle_long_fork() { push_blocks_to_client(client, 49, 1201, 800); push_blocks_to_client(client, 53, 1201, 600); - for _ in 0..20 { + for _ in 0..40 { client.import_verified_blocks(&IoChannel::disconnected()); } assert_eq!(2000, client.chain_info().best_block_number); diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 63ab8bcc8..63f699c9e 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -113,11 +113,12 @@ const MAX_HEADERS_TO_SEND: usize = 512; const MAX_NODE_DATA_TO_SEND: usize = 1024; const MAX_RECEIPTS_TO_SEND: usize = 1024; const MAX_RECEIPTS_HEADERS_TO_SEND: usize = 256; -const MAX_HEADERS_TO_REQUEST: usize = 256; +const MAX_HEADERS_TO_REQUEST: usize = 128; const MAX_BODIES_TO_REQUEST: usize = 64; const MIN_PEERS_PROPAGATION: usize = 4; const MAX_PEERS_PROPAGATION: usize = 128; const MAX_PEER_LAG_PROPAGATION: BlockNumber = 20; +const SUBCHAIN_SIZE: usize = 64; const STATUS_PACKET: u8 = 0x00; const NEW_BLOCK_HASHES_PACKET: u8 = 0x01; @@ -639,7 +640,7 @@ impl ChainSync { self.sync_peer(io, p, false); } } - if !self.peers.values().any(|p| p.asking != PeerAsking::Nothing) { + if self.state != SyncState::Waiting && !self.peers.values().any(|p| p.asking != PeerAsking::Nothing) { self.complete_sync(); } } @@ -665,7 +666,7 @@ impl ChainSync { return; } if self.state == SyncState::Waiting { - trace!(target: "sync", "Waiting for block queue"); + trace!(target: "sync", "Waiting for the block queue"); return; } (peer.latest_hash.clone(), peer.difficulty.clone()) @@ -689,7 +690,7 @@ impl ChainSync { // Request subchain headers trace!(target: "sync", "Starting sync with better chain"); let last = self.last_imported_hash.clone(); - self.request_headers_by_hash(io, peer_id, &last, 128, 255, false, PeerAsking::Heads); + self.request_headers_by_hash(io, peer_id, &last, SUBCHAIN_SIZE, MAX_HEADERS_TO_REQUEST - 1, false, PeerAsking::Heads); }, SyncState::Blocks | SyncState::NewBlocks => { if io.chain().block_status(BlockID::Hash(peer_latest)) == BlockStatus::Unknown { @@ -704,6 +705,8 @@ impl ChainSync { fn start_sync_round(&mut self, io: &mut SyncIo) { self.state = SyncState::ChainHead; trace!(target: "sync", "Starting round (last imported count = {:?}, block = {:?}", self.imported_this_round, self.last_imported_block); + // Check if need to retract to find the common block. The problem is that the peers still return headers by hash even + // from the non-canonical part of the tree. So we also retract if nothing has been imported last round. if self.imported_this_round.is_some() && self.imported_this_round.unwrap() == 0 && self.last_imported_block > 0 { match io.chain().block_hash(BlockID::Number(self.last_imported_block - 1)) { Some(h) => { @@ -781,9 +784,13 @@ impl ChainSync { match io.chain().import_block(block) { Err(Error::Import(ImportError::AlreadyInChain)) => { + self.last_imported_block = number; + self.last_imported_hash = h.clone(); trace!(target: "sync", "Block already in chain {:?}", h); }, Err(Error::Import(ImportError::AlreadyQueued)) => { + self.last_imported_block = number; + self.last_imported_hash = h.clone(); trace!(target: "sync", "Block already queued {:?}", h); }, Ok(_) => { @@ -856,13 +863,10 @@ impl ChainSync { /// Generic request sender fn send_request(&mut self, sync: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, packet_id: PacketId, packet: Bytes) { - { - let peer = self.peers.get_mut(&peer_id).unwrap(); - if peer.asking != PeerAsking::Nothing { - warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking); - } + let peer = self.peers.get_mut(&peer_id).unwrap(); + if peer.asking != PeerAsking::Nothing { + warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking); } - let mut peer = self.peers.get_mut(&peer_id).unwrap(); peer.asking = asking; peer.ask_time = time::precise_time_s(); if let Err(e) = sync.send(peer_id, packet_id, packet) { @@ -1095,6 +1099,7 @@ impl ChainSync { let tick = time::precise_time_s(); for (peer_id, peer) in &self.peers { if peer.asking != PeerAsking::Nothing && (tick - peer.ask_time) > CONNECTION_TIMEOUT_SEC { + trace!(target:"sync", "Timeouted {}", peer_id); io.disconnect_peer(*peer_id); } } From 7f3ba85a3f83a53f949d96ac0f9dedafafa39d9d Mon Sep 17 00:00:00 2001 From: arkpar Date: Sun, 29 May 2016 00:38:10 +0200 Subject: [PATCH 5/5] Fixed block/hashes propagation --- sync/src/chain.rs | 35 +++++++++++++++++------------------ sync/src/tests/chain.rs | 5 +++-- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 63f699c9e..2f31f1d47 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -1165,24 +1165,23 @@ impl ChainSync { .collect::>() } + + fn select_lagging_peers(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> Vec<(PeerId, BlockNumber)> { + use rand::Rng; + let mut lagging_peers = self.get_lagging_peers(chain_info, io); + // take sqrt(x) peers + let mut count = (self.peers.len() as f64).powf(0.5).round() as usize; + count = min(count, MAX_PEERS_PROPAGATION); + count = max(count, MIN_PEERS_PROPAGATION); + ::rand::thread_rng().shuffle(&mut lagging_peers); + lagging_peers.into_iter().take(count).collect::>() + } + /// propagates latest block to lagging peers fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> usize { - let updated_peers = { - let lagging_peers = self.get_lagging_peers(chain_info, io); - - // sqrt(x)/x scaled to max u32 - let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32; - let lucky_peers = match lagging_peers.len() { - 0 ... MIN_PEERS_PROPAGATION => lagging_peers, - _ => lagging_peers.into_iter().filter(|_| ::rand::random::() < fraction).collect::>() - }; - - // taking at max of MAX_PEERS_PROPAGATION - lucky_peers.iter().map(|&(id, _)| id.clone()).take(min(lucky_peers.len(), MAX_PEERS_PROPAGATION)).collect::>() - }; - + let lucky_peers = self.select_lagging_peers(chain_info, io); let mut sent = 0; - for peer_id in updated_peers { + for (peer_id, _) in lucky_peers { let rlp = ChainSync::create_latest_block_rlp(io.chain()); self.send_packet(io, peer_id, NEW_BLOCK_PACKET, rlp); self.peers.get_mut(&peer_id).unwrap().latest_hash = chain_info.best_block_hash.clone(); @@ -1194,12 +1193,12 @@ impl ChainSync { /// propagates new known hashes to all peers fn propagate_new_hashes(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> usize { - let updated_peers = self.get_lagging_peers(chain_info, io); + let lucky_peers = self.select_lagging_peers(chain_info, io); let mut sent = 0; let last_parent = HeaderView::new(&io.chain().block_header(BlockID::Hash(chain_info.best_block_hash.clone())).unwrap()).parent_hash(); - for (peer_id, peer_number) in updated_peers { + for (peer_id, peer_number) in lucky_peers { let mut peer_best = self.peers.get(&peer_id).unwrap().latest_hash.clone(); - if chain_info.best_block_number - peer_number > MAX_PEERS_PROPAGATION as BlockNumber { + if chain_info.best_block_number - peer_number > MAX_PEER_LAG_PROPAGATION as BlockNumber { // If we think peer is too far behind just send one latest hash peer_best = last_parent.clone(); } diff --git a/sync/src/tests/chain.rs b/sync/src/tests/chain.rs index 2e3ec1f4c..09e83e358 100644 --- a/sync/src/tests/chain.rs +++ b/sync/src/tests/chain.rs @@ -159,7 +159,7 @@ fn propagate_hashes() { #[test] fn propagate_blocks() { - let mut net = TestNet::new(2); + let mut net = TestNet::new(20); net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle); net.sync(); @@ -169,7 +169,8 @@ fn propagate_blocks() { assert!(!net.peer(0).queue.is_empty()); // NEW_BLOCK_PACKET - assert_eq!(0x07, net.peer(0).queue[0].packet_id); + let blocks = net.peer(0).queue.iter().filter(|p| p.packet_id == 0x7).count(); + assert!(blocks > 0); } #[test]