Sync fixes
This commit is contained in:
		
							parent
							
								
									aed348ce8b
								
							
						
					
					
						commit
						84732d4b94
					
				| @ -105,7 +105,7 @@ struct Verification { | ||||
| 	bad: HashSet<H256>, | ||||
| } | ||||
| 
 | ||||
| const MAX_UNVERIFIED_QUEUE_SIZE: usize = 50000; | ||||
| const MAX_UNVERIFIED_QUEUE_SIZE: usize = 2000; | ||||
| 
 | ||||
| impl BlockQueue { | ||||
| 	/// Creates a new queue instance.
 | ||||
|  | ||||
| @ -350,24 +350,26 @@ impl Client { | ||||
| 		self.chain.write().unwrap().configure_cache(pref_cache_size, max_cache_size); | ||||
| 	} | ||||
| 
 | ||||
| 	fn block_hash(&self, id: BlockId) -> Option<H256> { | ||||
| 	fn block_hash(chain: &BlockChain, id: BlockId) -> Option<H256> { | ||||
| 		match id { | ||||
| 			BlockId::Hash(hash) => Some(hash), | ||||
| 			BlockId::Number(number) => self.chain.read().unwrap().block_hash(number), | ||||
| 			BlockId::Earliest => self.chain.read().unwrap().block_hash(0), | ||||
| 			BlockId::Latest => Some(self.chain.read().unwrap().best_block_hash()) | ||||
| 			BlockId::Number(number) => chain.block_hash(number), | ||||
| 			BlockId::Earliest => chain.block_hash(0), | ||||
| 			BlockId::Latest => Some(chain.best_block_hash()) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| impl BlockChainClient for Client { | ||||
| 	fn block_header(&self, id: BlockId) -> Option<Bytes> { | ||||
| 		self.block_hash(id).and_then(|hash| self.chain.read().unwrap().block(&hash).map(|bytes| BlockView::new(&bytes).rlp().at(0).as_raw().to_vec())) | ||||
| 		let chain = self.chain.read().unwrap(); | ||||
| 		Self::block_hash(&chain, id).and_then(|hash| chain.block(&hash).map(|bytes| BlockView::new(&bytes).rlp().at(0).as_raw().to_vec())) | ||||
| 	} | ||||
| 
 | ||||
| 	fn block_body(&self, id: BlockId) -> Option<Bytes> { | ||||
| 		self.block_hash(id).and_then(|hash| { | ||||
| 			self.chain.read().unwrap().block(&hash).map(|bytes| { | ||||
| 		let chain = self.chain.read().unwrap(); | ||||
| 		Self::block_hash(&chain, id).and_then(|hash| { | ||||
| 			chain.block(&hash).map(|bytes| { | ||||
| 				let rlp = Rlp::new(&bytes); | ||||
| 				let mut body = RlpStream::new_list(2); | ||||
| 				body.append_raw(rlp.at(1).as_raw(), 1); | ||||
| @ -378,21 +380,24 @@ impl BlockChainClient for Client { | ||||
| 	} | ||||
| 
 | ||||
| 	fn block(&self, id: BlockId) -> Option<Bytes> { | ||||
| 		self.block_hash(id).and_then(|hash| { | ||||
| 			self.chain.read().unwrap().block(&hash) | ||||
| 		let chain = self.chain.read().unwrap(); | ||||
| 		Self::block_hash(&chain, id).and_then(|hash| { | ||||
| 			chain.block(&hash) | ||||
| 		}) | ||||
| 	} | ||||
| 
 | ||||
| 	fn block_status(&self, id: BlockId) -> BlockStatus { | ||||
| 		match self.block_hash(id) { | ||||
| 			Some(ref hash) if self.chain.read().unwrap().is_known(hash) => BlockStatus::InChain, | ||||
| 		let chain = self.chain.read().unwrap(); | ||||
| 		match Self::block_hash(&chain, id) { | ||||
| 			Some(ref hash) if chain.is_known(hash) => BlockStatus::InChain, | ||||
| 			Some(hash) => self.block_queue.read().unwrap().block_status(&hash), | ||||
| 			None => BlockStatus::Unknown | ||||
| 		} | ||||
| 	} | ||||
| 	
 | ||||
| 	fn block_total_difficulty(&self, id: BlockId) -> Option<U256> { | ||||
| 		self.block_hash(id).and_then(|hash| self.chain.read().unwrap().block_details(&hash)).map(|d| d.total_difficulty) | ||||
| 		let chain = self.chain.read().unwrap(); | ||||
| 		Self::block_hash(&chain, id).and_then(|hash| chain.block_details(&hash)).map(|d| d.total_difficulty) | ||||
| 	} | ||||
| 
 | ||||
| 	fn code(&self, address: &Address) -> Option<Bytes> { | ||||
| @ -400,13 +405,14 @@ impl BlockChainClient for Client { | ||||
| 	} | ||||
| 
 | ||||
| 	fn transaction(&self, id: TransactionId) -> Option<LocalizedTransaction> { | ||||
| 		let chain = self.chain.read().unwrap(); | ||||
| 		match id { | ||||
| 			TransactionId::Hash(ref hash) => self.chain.read().unwrap().transaction_address(hash), | ||||
| 			TransactionId::Location(id, index) => self.block_hash(id).map(|hash| TransactionAddress { | ||||
| 			TransactionId::Hash(ref hash) => chain.transaction_address(hash), | ||||
| 			TransactionId::Location(id, index) => Self::block_hash(&chain, id).map(|hash| TransactionAddress { | ||||
| 				block_hash: hash, | ||||
| 				index: index | ||||
| 			}) | ||||
| 		}.and_then(|address| self.chain.read().unwrap().transaction(&address)) | ||||
| 		}.and_then(|address| chain.transaction(&address)) | ||||
| 	} | ||||
| 
 | ||||
| 	fn tree_route(&self, from: &H256, to: &H256) -> Option<TreeRoute> { | ||||
|  | ||||
| @ -583,7 +583,7 @@ impl ChainSync { | ||||
| 			trace!(target: "sync", "Starting sync with better chain"); | ||||
| 			self.request_headers_by_hash(io, peer_id, &peer_latest, 1, 0, false); | ||||
| 		} | ||||
| 		else if self.state == SyncState::Blocks { | ||||
| 		else if self.state == SyncState::Blocks && io.chain().block_status(BlockId::Hash(peer_latest)) == BlockStatus::Unknown { | ||||
| 			self.request_blocks(io, peer_id); | ||||
| 		} | ||||
| 	} | ||||
| @ -1045,7 +1045,7 @@ impl ChainSync { | ||||
| 
 | ||||
| 	fn check_resume(&mut self, io: &mut SyncIo) { | ||||
| 		if !io.chain().queue_info().is_full() && self.state == SyncState::Waiting { | ||||
| 			self.state = SyncState::Idle; | ||||
| 			self.state = SyncState::Blocks; | ||||
| 			self.continue_sync(io); | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| @ -412,7 +412,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone { | ||||
| 		let mut to_kill = Vec::new(); | ||||
| 		for e in self.connections.write().unwrap().iter_mut() { | ||||
| 			if let ConnectionEntry::Session(ref mut s) = *e.lock().unwrap().deref_mut() { | ||||
| 				if !s.keep_alive() { | ||||
| 				if !s.keep_alive(io) { | ||||
| 					s.disconnect(DisconnectReason::PingTimeout); | ||||
| 					to_kill.push(s.token()); | ||||
| 				} | ||||
|  | ||||
| @ -180,7 +180,7 @@ impl Session { | ||||
| 	} | ||||
| 
 | ||||
| 	/// Keep this session alive. Returns false if ping timeout happened
 | ||||
| 	pub fn keep_alive(&mut self) -> bool { | ||||
| 	pub fn keep_alive<Message>(&mut self, io: &IoContext<Message>) -> bool where Message: Send + Sync + Clone { | ||||
| 		let timed_out = if let Some(pong) = self.pong_time_ns { | ||||
| 			pong - self.ping_time_ns > PING_TIMEOUT_SEC * 1000_000_000 | ||||
| 		} else { | ||||
| @ -191,6 +191,7 @@ impl Session { | ||||
| 			if let Err(e) = self.send_ping() { | ||||
| 				debug!("Error sending ping message: {:?}", e); | ||||
| 			} | ||||
| 			io.update_registration(self.token()).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e)); | ||||
| 		} | ||||
| 		!timed_out | ||||
| 	} | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user