diff --git a/ethcore/src/tests/client.rs b/ethcore/src/tests/client.rs index ee2cb9c3c..d734b0b47 100644 --- a/ethcore/src/tests/client.rs +++ b/ethcore/src/tests/client.rs @@ -115,7 +115,7 @@ fn can_collect_garbage() { fn can_handle_long_fork() { let client_result = generate_dummy_client(1200); let client = client_result.reference(); - for _ in 0..10 { + for _ in 0..20 { client.import_verified_blocks(&IoChannel::disconnected()); } assert_eq!(1200, client.chain_info().best_block_number); @@ -124,7 +124,7 @@ fn can_handle_long_fork() { push_blocks_to_client(client, 49, 1201, 800); push_blocks_to_client(client, 53, 1201, 600); - for _ in 0..20 { + for _ in 0..40 { client.import_verified_blocks(&IoChannel::disconnected()); } assert_eq!(2000, client.chain_info().best_block_number); diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 63ab8bcc8..63f699c9e 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -113,11 +113,12 @@ const MAX_HEADERS_TO_SEND: usize = 512; const MAX_NODE_DATA_TO_SEND: usize = 1024; const MAX_RECEIPTS_TO_SEND: usize = 1024; const MAX_RECEIPTS_HEADERS_TO_SEND: usize = 256; -const MAX_HEADERS_TO_REQUEST: usize = 256; +const MAX_HEADERS_TO_REQUEST: usize = 128; const MAX_BODIES_TO_REQUEST: usize = 64; const MIN_PEERS_PROPAGATION: usize = 4; const MAX_PEERS_PROPAGATION: usize = 128; const MAX_PEER_LAG_PROPAGATION: BlockNumber = 20; +const SUBCHAIN_SIZE: usize = 64; const STATUS_PACKET: u8 = 0x00; const NEW_BLOCK_HASHES_PACKET: u8 = 0x01; @@ -639,7 +640,7 @@ impl ChainSync { self.sync_peer(io, p, false); } } - if !self.peers.values().any(|p| p.asking != PeerAsking::Nothing) { + if self.state != SyncState::Waiting && !self.peers.values().any(|p| p.asking != PeerAsking::Nothing) { self.complete_sync(); } } @@ -665,7 +666,7 @@ impl ChainSync { return; } if self.state == SyncState::Waiting { - trace!(target: "sync", "Waiting for block queue"); + trace!(target: "sync", "Waiting for the block queue"); return; } (peer.latest_hash.clone(), peer.difficulty.clone()) @@ -689,7 +690,7 @@ impl ChainSync { // Request subchain headers trace!(target: "sync", "Starting sync with better chain"); let last = self.last_imported_hash.clone(); - self.request_headers_by_hash(io, peer_id, &last, 128, 255, false, PeerAsking::Heads); + self.request_headers_by_hash(io, peer_id, &last, SUBCHAIN_SIZE, MAX_HEADERS_TO_REQUEST - 1, false, PeerAsking::Heads); }, SyncState::Blocks | SyncState::NewBlocks => { if io.chain().block_status(BlockID::Hash(peer_latest)) == BlockStatus::Unknown { @@ -704,6 +705,8 @@ impl ChainSync { fn start_sync_round(&mut self, io: &mut SyncIo) { self.state = SyncState::ChainHead; trace!(target: "sync", "Starting round (last imported count = {:?}, block = {:?}", self.imported_this_round, self.last_imported_block); + // Check if need to retract to find the common block. The problem is that the peers still return headers by hash even + // from the non-canonical part of the tree. So we also retract if nothing has been imported last round. if self.imported_this_round.is_some() && self.imported_this_round.unwrap() == 0 && self.last_imported_block > 0 { match io.chain().block_hash(BlockID::Number(self.last_imported_block - 1)) { Some(h) => { @@ -781,9 +784,13 @@ impl ChainSync { match io.chain().import_block(block) { Err(Error::Import(ImportError::AlreadyInChain)) => { + self.last_imported_block = number; + self.last_imported_hash = h.clone(); trace!(target: "sync", "Block already in chain {:?}", h); }, Err(Error::Import(ImportError::AlreadyQueued)) => { + self.last_imported_block = number; + self.last_imported_hash = h.clone(); trace!(target: "sync", "Block already queued {:?}", h); }, Ok(_) => { @@ -856,13 +863,10 @@ impl ChainSync { /// Generic request sender fn send_request(&mut self, sync: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, packet_id: PacketId, packet: Bytes) { - { - let peer = self.peers.get_mut(&peer_id).unwrap(); - if peer.asking != PeerAsking::Nothing { - warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking); - } + let peer = self.peers.get_mut(&peer_id).unwrap(); + if peer.asking != PeerAsking::Nothing { + warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking); } - let mut peer = self.peers.get_mut(&peer_id).unwrap(); peer.asking = asking; peer.ask_time = time::precise_time_s(); if let Err(e) = sync.send(peer_id, packet_id, packet) { @@ -1095,6 +1099,7 @@ impl ChainSync { let tick = time::precise_time_s(); for (peer_id, peer) in &self.peers { if peer.asking != PeerAsking::Nothing && (tick - peer.ask_time) > CONNECTION_TIMEOUT_SEC { + trace!(target:"sync", "Timeouted {}", peer_id); io.disconnect_peer(*peer_id); } }