Improve block and transaction propagation (#9954)
* Refactor sync to add priority tasks. * Send priority tasks notifications. * Propagate blocks, optimize transactions. * Implement transaction propagation. Use sync_channel. * Tone down info. * Prevent deadlock by not waiting forever for sync lock. * Fix lock order. * Don't use sync_channel to prevent deadlocks. * Fix tests.
This commit is contained in:
		
							parent
							
								
									14c9cbd40e
								
							
						
					
					
						commit
						0b5bbf6048
					
				@ -106,7 +106,13 @@ impl ClientService {
 | 
			
		||||
		info!("Configured for {} using {} engine", Colour::White.bold().paint(spec.name.clone()), Colour::Yellow.bold().paint(spec.engine.name()));
 | 
			
		||||
 | 
			
		||||
		let pruning = config.pruning;
 | 
			
		||||
		let client = Client::new(config, &spec, blockchain_db.clone(), miner.clone(), io_service.channel())?;
 | 
			
		||||
		let client = Client::new(
 | 
			
		||||
			config,
 | 
			
		||||
			&spec,
 | 
			
		||||
			blockchain_db.clone(),
 | 
			
		||||
			miner.clone(),
 | 
			
		||||
			io_service.channel(),
 | 
			
		||||
		)?;
 | 
			
		||||
		miner.set_io_channel(io_service.channel());
 | 
			
		||||
		miner.set_in_chain_checker(&client.clone());
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -15,7 +15,7 @@
 | 
			
		||||
// along with Parity.  If not, see <http://www.gnu.org/licenses/>.
 | 
			
		||||
 | 
			
		||||
use bytes::Bytes;
 | 
			
		||||
use ethereum_types::H256;
 | 
			
		||||
use ethereum_types::{H256, U256};
 | 
			
		||||
use transaction::UnverifiedTransaction;
 | 
			
		||||
use blockchain::ImportRoute;
 | 
			
		||||
use std::time::Duration;
 | 
			
		||||
@ -141,7 +141,15 @@ pub trait ChainNotify : Send + Sync {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// fires when chain broadcasts a message
 | 
			
		||||
	fn broadcast(&self, _message_type: ChainMessageType) {}
 | 
			
		||||
	fn broadcast(&self, _message_type: ChainMessageType) {
 | 
			
		||||
		// does nothing by default
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// fires when new block is about to be imported
 | 
			
		||||
	/// implementations should be light
 | 
			
		||||
	fn block_pre_import(&self, _bytes: &Bytes, _hash: &H256, _difficulty: &U256) {
 | 
			
		||||
		// does nothing by default
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// fires when new transactions are received from a peer
 | 
			
		||||
	fn transactions_received(&self,
 | 
			
		||||
 | 
			
		||||
@ -881,7 +881,7 @@ impl Client {
 | 
			
		||||
	/// Flush the block import queue.
 | 
			
		||||
	pub fn flush_queue(&self) {
 | 
			
		||||
		self.importer.block_queue.flush();
 | 
			
		||||
		while !self.importer.block_queue.queue_info().is_empty() {
 | 
			
		||||
		while !self.importer.block_queue.is_empty() {
 | 
			
		||||
			self.import_verified_blocks();
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
@ -1423,8 +1423,21 @@ impl ImportBlock for Client {
 | 
			
		||||
			bail!(EthcoreErrorKind::Block(BlockError::UnknownParent(unverified.parent_hash())));
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		let raw = if self.importer.block_queue.is_empty() {
 | 
			
		||||
			Some((
 | 
			
		||||
				unverified.bytes.clone(),
 | 
			
		||||
				unverified.header.hash(),
 | 
			
		||||
				*unverified.header.difficulty(),
 | 
			
		||||
			))
 | 
			
		||||
		} else { None };
 | 
			
		||||
 | 
			
		||||
		match self.importer.block_queue.import(unverified) {
 | 
			
		||||
			Ok(res) => Ok(res),
 | 
			
		||||
			Ok(hash) => {
 | 
			
		||||
				if let Some((raw, hash, difficulty)) = raw {
 | 
			
		||||
					self.notify(move |n| n.block_pre_import(&raw, &hash, &difficulty));
 | 
			
		||||
				}
 | 
			
		||||
				Ok(hash)
 | 
			
		||||
			},
 | 
			
		||||
			// we only care about block errors (not import errors)
 | 
			
		||||
			Err((block, EthcoreError(EthcoreErrorKind::Block(err), _))) => {
 | 
			
		||||
				self.importer.bad_blocks.report(block.bytes, format!("{:?}", err));
 | 
			
		||||
@ -1878,6 +1891,10 @@ impl BlockChainClient for Client {
 | 
			
		||||
		self.importer.block_queue.queue_info()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fn is_queue_empty(&self) -> bool {
 | 
			
		||||
		self.importer.block_queue.is_empty()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fn clear_queue(&self) {
 | 
			
		||||
		self.importer.block_queue.clear();
 | 
			
		||||
	}
 | 
			
		||||
@ -2288,7 +2305,11 @@ impl ScheduleInfo for Client {
 | 
			
		||||
impl ImportSealedBlock for Client {
 | 
			
		||||
	fn import_sealed_block(&self, block: SealedBlock) -> EthcoreResult<H256> {
 | 
			
		||||
		let start = Instant::now();
 | 
			
		||||
		let raw = block.rlp_bytes();
 | 
			
		||||
		let header = block.header().clone();
 | 
			
		||||
		let hash = header.hash();
 | 
			
		||||
		self.notify(|n| n.block_pre_import(&raw, &hash, header.difficulty()));
 | 
			
		||||
 | 
			
		||||
		let route = {
 | 
			
		||||
			// Do a super duper basic verification to detect potential bugs
 | 
			
		||||
			if let Err(e) = self.engine.verify_block_basic(&header) {
 | 
			
		||||
@ -2306,15 +2327,14 @@ impl ImportSealedBlock for Client {
 | 
			
		||||
			let block_data = block.rlp_bytes();
 | 
			
		||||
 | 
			
		||||
			let route = self.importer.commit_block(block, &header, encoded::Block::new(block_data), self);
 | 
			
		||||
			trace!(target: "client", "Imported sealed block #{} ({})", header.number(), header.hash());
 | 
			
		||||
			trace!(target: "client", "Imported sealed block #{} ({})", header.number(), hash);
 | 
			
		||||
			self.state_db.write().sync_cache(&route.enacted, &route.retracted, false);
 | 
			
		||||
			route
 | 
			
		||||
		};
 | 
			
		||||
		let h = header.hash();
 | 
			
		||||
		let route = ChainRoute::from([route].as_ref());
 | 
			
		||||
		self.importer.miner.chain_new_blocks(
 | 
			
		||||
			self,
 | 
			
		||||
			&[h],
 | 
			
		||||
			&[hash],
 | 
			
		||||
			&[],
 | 
			
		||||
			route.enacted(),
 | 
			
		||||
			route.retracted(),
 | 
			
		||||
@ -2322,16 +2342,16 @@ impl ImportSealedBlock for Client {
 | 
			
		||||
		);
 | 
			
		||||
		self.notify(|notify| {
 | 
			
		||||
			notify.new_blocks(
 | 
			
		||||
				vec![h],
 | 
			
		||||
				vec![hash],
 | 
			
		||||
				vec![],
 | 
			
		||||
				route.clone(),
 | 
			
		||||
				vec![h],
 | 
			
		||||
				vec![hash],
 | 
			
		||||
				vec![],
 | 
			
		||||
				start.elapsed(),
 | 
			
		||||
			);
 | 
			
		||||
		});
 | 
			
		||||
		self.db.read().key_value().flush().expect("DB flush failed.");
 | 
			
		||||
		Ok(h)
 | 
			
		||||
		Ok(hash)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -300,6 +300,11 @@ pub trait BlockChainClient : Sync + Send + AccountData + BlockChain + CallContra
 | 
			
		||||
	/// Get block queue information.
 | 
			
		||||
	fn queue_info(&self) -> BlockQueueInfo;
 | 
			
		||||
 | 
			
		||||
	/// Returns true if block queue is empty.
 | 
			
		||||
	fn is_queue_empty(&self) -> bool {
 | 
			
		||||
		self.queue_info().is_empty()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// Clear block queue and abort all import activity.
 | 
			
		||||
	fn clear_queue(&self);
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -576,7 +576,7 @@ impl Miner {
 | 
			
		||||
		trace!(target: "miner", "requires_reseal: sealing enabled");
 | 
			
		||||
 | 
			
		||||
		// Disable sealing if there were no requests for SEALING_TIMEOUT_IN_BLOCKS
 | 
			
		||||
		let had_requests = sealing.last_request.map(|last_request| 
 | 
			
		||||
		let had_requests = sealing.last_request.map(|last_request|
 | 
			
		||||
			best_block.saturating_sub(last_request) <= SEALING_TIMEOUT_IN_BLOCKS
 | 
			
		||||
		).unwrap_or(false);
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -583,6 +583,13 @@ impl<K: Kind> VerificationQueue<K> {
 | 
			
		||||
		result
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// Returns true if there is nothing currently in the queue.
 | 
			
		||||
	/// TODO [ToDr] Optimize to avoid locking
 | 
			
		||||
	pub fn is_empty(&self) -> bool {
 | 
			
		||||
		let v = &self.verification;
 | 
			
		||||
		v.unverified.lock().is_empty() && v.verifying.lock().is_empty() && v.verified.lock().is_empty()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// Get queue status.
 | 
			
		||||
	pub fn queue_info(&self) -> QueueInfo {
 | 
			
		||||
		use std::mem::size_of;
 | 
			
		||||
 | 
			
		||||
@ -14,7 +14,7 @@
 | 
			
		||||
// You should have received a copy of the GNU General Public License
 | 
			
		||||
// along with Parity.  If not, see <http://www.gnu.org/licenses/>.
 | 
			
		||||
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use std::sync::{Arc, mpsc, atomic};
 | 
			
		||||
use std::collections::{HashMap, BTreeMap};
 | 
			
		||||
use std::io;
 | 
			
		||||
use std::ops::Range;
 | 
			
		||||
@ -33,10 +33,10 @@ use ethcore::client::{BlockChainClient, ChainNotify, ChainRoute, ChainMessageTyp
 | 
			
		||||
use ethcore::snapshot::SnapshotService;
 | 
			
		||||
use ethcore::header::BlockNumber;
 | 
			
		||||
use sync_io::NetSyncIo;
 | 
			
		||||
use chain::{ChainSync, SyncStatus as EthSyncStatus};
 | 
			
		||||
use chain::{ChainSyncApi, SyncStatus as EthSyncStatus};
 | 
			
		||||
use std::net::{SocketAddr, AddrParseError};
 | 
			
		||||
use std::str::FromStr;
 | 
			
		||||
use parking_lot::RwLock;
 | 
			
		||||
use parking_lot::{RwLock, Mutex};
 | 
			
		||||
use chain::{ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_62,
 | 
			
		||||
	PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3,
 | 
			
		||||
	PRIVATE_TRANSACTION_PACKET, SIGNED_PRIVATE_TRANSACTION_PACKET};
 | 
			
		||||
@ -228,6 +228,37 @@ impl AttachedProtocol {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// A prioritized tasks run in a specialised timer.
 | 
			
		||||
/// Every task should be completed within a hard deadline,
 | 
			
		||||
/// if it's not it's either cancelled or split into multiple tasks.
 | 
			
		||||
/// NOTE These tasks might not complete at all, so anything
 | 
			
		||||
/// that happens here should work even if the task is cancelled.
 | 
			
		||||
#[derive(Debug)]
 | 
			
		||||
pub enum PriorityTask {
 | 
			
		||||
	/// Propagate given block
 | 
			
		||||
	PropagateBlock {
 | 
			
		||||
		/// When the task was initiated
 | 
			
		||||
		started: ::std::time::Instant,
 | 
			
		||||
		/// Raw block RLP to propagate
 | 
			
		||||
		block: Bytes,
 | 
			
		||||
		/// Block hash
 | 
			
		||||
		hash: H256,
 | 
			
		||||
		/// Blocks difficulty
 | 
			
		||||
		difficulty: U256,
 | 
			
		||||
	},
 | 
			
		||||
	/// Propagate a list of transactions
 | 
			
		||||
	PropagateTransactions(::std::time::Instant, Arc<atomic::AtomicBool>),
 | 
			
		||||
}
 | 
			
		||||
impl PriorityTask {
 | 
			
		||||
	/// Mark the task as being processed, right after it's retrieved from the queue.
 | 
			
		||||
	pub fn starting(&self) {
 | 
			
		||||
		match *self {
 | 
			
		||||
			PriorityTask::PropagateTransactions(_, ref is_ready) => is_ready.store(true, atomic::Ordering::SeqCst),
 | 
			
		||||
			_ => {},
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// EthSync initialization parameters.
 | 
			
		||||
pub struct Params {
 | 
			
		||||
	/// Configuration.
 | 
			
		||||
@ -260,6 +291,8 @@ pub struct EthSync {
 | 
			
		||||
	subprotocol_name: [u8; 3],
 | 
			
		||||
	/// Light subprotocol name.
 | 
			
		||||
	light_subprotocol_name: [u8; 3],
 | 
			
		||||
	/// Priority tasks notification channel
 | 
			
		||||
	priority_tasks: Mutex<mpsc::Sender<PriorityTask>>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn light_params(
 | 
			
		||||
@ -312,13 +345,19 @@ impl EthSync {
 | 
			
		||||
			})
 | 
			
		||||
		};
 | 
			
		||||
 | 
			
		||||
		let chain_sync = ChainSync::new(params.config, &*params.chain, params.private_tx_handler.clone());
 | 
			
		||||
		let (priority_tasks_tx, priority_tasks_rx) = mpsc::channel();
 | 
			
		||||
		let sync = ChainSyncApi::new(
 | 
			
		||||
			params.config,
 | 
			
		||||
			&*params.chain,
 | 
			
		||||
			params.private_tx_handler.clone(),
 | 
			
		||||
			priority_tasks_rx,
 | 
			
		||||
		);
 | 
			
		||||
		let service = NetworkService::new(params.network_config.clone().into_basic()?, connection_filter)?;
 | 
			
		||||
 | 
			
		||||
		let sync = Arc::new(EthSync {
 | 
			
		||||
			network: service,
 | 
			
		||||
			eth_handler: Arc::new(SyncProtocolHandler {
 | 
			
		||||
				sync: RwLock::new(chain_sync),
 | 
			
		||||
				sync,
 | 
			
		||||
				chain: params.chain,
 | 
			
		||||
				snapshot_service: params.snapshot_service,
 | 
			
		||||
				overlay: RwLock::new(HashMap::new()),
 | 
			
		||||
@ -327,26 +366,32 @@ impl EthSync {
 | 
			
		||||
			subprotocol_name: params.config.subprotocol_name,
 | 
			
		||||
			light_subprotocol_name: params.config.light_subprotocol_name,
 | 
			
		||||
			attached_protos: params.attached_protos,
 | 
			
		||||
			priority_tasks: Mutex::new(priority_tasks_tx),
 | 
			
		||||
		});
 | 
			
		||||
 | 
			
		||||
		Ok(sync)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// Priority tasks producer
 | 
			
		||||
	pub fn priority_tasks(&self) -> mpsc::Sender<PriorityTask> {
 | 
			
		||||
		self.priority_tasks.lock().clone()
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl SyncProvider for EthSync {
 | 
			
		||||
	/// Get sync status
 | 
			
		||||
	fn status(&self) -> EthSyncStatus {
 | 
			
		||||
		self.eth_handler.sync.read().status()
 | 
			
		||||
		self.eth_handler.sync.status()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// Get sync peers
 | 
			
		||||
	fn peers(&self) -> Vec<PeerInfo> {
 | 
			
		||||
		self.network.with_context_eval(self.subprotocol_name, |ctx| {
 | 
			
		||||
			let peer_ids = self.network.connected_peers();
 | 
			
		||||
			let eth_sync = self.eth_handler.sync.read();
 | 
			
		||||
			let light_proto = self.light_proto.as_ref();
 | 
			
		||||
 | 
			
		||||
			peer_ids.into_iter().filter_map(|peer_id| {
 | 
			
		||||
			let peer_info = self.eth_handler.sync.peer_info(&peer_ids);
 | 
			
		||||
			peer_ids.into_iter().zip(peer_info).filter_map(|(peer_id, peer_info)| {
 | 
			
		||||
				let session_info = match ctx.session_info(peer_id) {
 | 
			
		||||
					None => return None,
 | 
			
		||||
					Some(info) => info,
 | 
			
		||||
@ -358,7 +403,7 @@ impl SyncProvider for EthSync {
 | 
			
		||||
					capabilities: session_info.peer_capabilities.into_iter().map(|c| c.to_string()).collect(),
 | 
			
		||||
					remote_address: session_info.remote_address,
 | 
			
		||||
					local_address: session_info.local_address,
 | 
			
		||||
					eth_info: eth_sync.peer_info(&peer_id),
 | 
			
		||||
					eth_info: peer_info,
 | 
			
		||||
					pip_info: light_proto.as_ref().and_then(|lp| lp.peer_status(peer_id)).map(Into::into),
 | 
			
		||||
				})
 | 
			
		||||
			}).collect()
 | 
			
		||||
@ -370,17 +415,16 @@ impl SyncProvider for EthSync {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fn transactions_stats(&self) -> BTreeMap<H256, TransactionStats> {
 | 
			
		||||
		let sync = self.eth_handler.sync.read();
 | 
			
		||||
		sync.transactions_stats()
 | 
			
		||||
			.iter()
 | 
			
		||||
			.map(|(hash, stats)| (*hash, stats.into()))
 | 
			
		||||
			.collect()
 | 
			
		||||
		self.eth_handler.sync.transactions_stats()
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
const PEERS_TIMER: TimerToken = 0;
 | 
			
		||||
const SYNC_TIMER: TimerToken = 1;
 | 
			
		||||
const TX_TIMER: TimerToken = 2;
 | 
			
		||||
const PRIORITY_TIMER: TimerToken = 3;
 | 
			
		||||
 | 
			
		||||
pub(crate) const PRIORITY_TIMER_INTERVAL: Duration = Duration::from_millis(250);
 | 
			
		||||
 | 
			
		||||
struct SyncProtocolHandler {
 | 
			
		||||
	/// Shared blockchain client.
 | 
			
		||||
@ -388,7 +432,7 @@ struct SyncProtocolHandler {
 | 
			
		||||
	/// Shared snapshot service.
 | 
			
		||||
	snapshot_service: Arc<SnapshotService>,
 | 
			
		||||
	/// Sync strategy
 | 
			
		||||
	sync: RwLock<ChainSync>,
 | 
			
		||||
	sync: ChainSyncApi,
 | 
			
		||||
	/// Chain overlay used to cache data such as fork block.
 | 
			
		||||
	overlay: RwLock<HashMap<BlockNumber, Bytes>>,
 | 
			
		||||
}
 | 
			
		||||
@ -399,11 +443,13 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
 | 
			
		||||
			io.register_timer(PEERS_TIMER, Duration::from_millis(700)).expect("Error registering peers timer");
 | 
			
		||||
			io.register_timer(SYNC_TIMER, Duration::from_millis(1100)).expect("Error registering sync timer");
 | 
			
		||||
			io.register_timer(TX_TIMER, Duration::from_millis(1300)).expect("Error registering transactions timer");
 | 
			
		||||
 | 
			
		||||
			io.register_timer(PRIORITY_TIMER, PRIORITY_TIMER_INTERVAL).expect("Error registering peers timer");
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
 | 
			
		||||
		ChainSync::dispatch_packet(&self.sync, &mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer, packet_id, data);
 | 
			
		||||
		self.sync.dispatch_packet(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer, packet_id, data);
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fn connected(&self, io: &NetworkContext, peer: &PeerId) {
 | 
			
		||||
@ -429,15 +475,26 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
 | 
			
		||||
		match timer {
 | 
			
		||||
			PEERS_TIMER => self.sync.write().maintain_peers(&mut io),
 | 
			
		||||
			SYNC_TIMER => self.sync.write().maintain_sync(&mut io),
 | 
			
		||||
			TX_TIMER => {
 | 
			
		||||
				self.sync.write().propagate_new_transactions(&mut io);
 | 
			
		||||
			},
 | 
			
		||||
			TX_TIMER => self.sync.write().propagate_new_transactions(&mut io),
 | 
			
		||||
			PRIORITY_TIMER => self.sync.process_priority_queue(&mut io),
 | 
			
		||||
			_ => warn!("Unknown timer {} triggered.", timer),
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl ChainNotify for EthSync {
 | 
			
		||||
	fn block_pre_import(&self, bytes: &Bytes, hash: &H256, difficulty: &U256) {
 | 
			
		||||
		let task = PriorityTask::PropagateBlock {
 | 
			
		||||
			started: ::std::time::Instant::now(),
 | 
			
		||||
			block: bytes.clone(),
 | 
			
		||||
			hash: *hash,
 | 
			
		||||
			difficulty: *difficulty,
 | 
			
		||||
		};
 | 
			
		||||
		if let Err(e) = self.priority_tasks.lock().send(task) {
 | 
			
		||||
			warn!(target: "sync", "Unexpected error during priority block propagation: {:?}", e);
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fn new_blocks(&self,
 | 
			
		||||
		imported: Vec<H256>,
 | 
			
		||||
		invalid: Vec<H256>,
 | 
			
		||||
 | 
			
		||||
@ -29,7 +29,6 @@ use rlp::Rlp;
 | 
			
		||||
use snapshot::ChunkType;
 | 
			
		||||
use std::cmp;
 | 
			
		||||
use std::mem;
 | 
			
		||||
use std::collections::HashSet;
 | 
			
		||||
use std::time::Instant;
 | 
			
		||||
use sync_io::SyncIo;
 | 
			
		||||
 | 
			
		||||
@ -58,7 +57,6 @@ use super::{
 | 
			
		||||
	SNAPSHOT_DATA_PACKET,
 | 
			
		||||
	SNAPSHOT_MANIFEST_PACKET,
 | 
			
		||||
	STATUS_PACKET,
 | 
			
		||||
	TRANSACTIONS_PACKET,
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
/// The Chain Sync Handler: handles responses from peers
 | 
			
		||||
@ -67,14 +65,9 @@ pub struct SyncHandler;
 | 
			
		||||
impl SyncHandler {
 | 
			
		||||
	/// Handle incoming packet from peer
 | 
			
		||||
	pub fn on_packet(sync: &mut ChainSync, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
 | 
			
		||||
		if packet_id != STATUS_PACKET && !sync.peers.contains_key(&peer) {
 | 
			
		||||
			debug!(target:"sync", "Unexpected packet {} from unregistered peer: {}:{}", packet_id, peer, io.peer_info(peer));
 | 
			
		||||
			return;
 | 
			
		||||
		}
 | 
			
		||||
		let rlp = Rlp::new(data);
 | 
			
		||||
		let result = match packet_id {
 | 
			
		||||
			STATUS_PACKET => SyncHandler::on_peer_status(sync, io, peer, &rlp),
 | 
			
		||||
			TRANSACTIONS_PACKET => SyncHandler::on_peer_transactions(sync, io, peer, &rlp),
 | 
			
		||||
			BLOCK_HEADERS_PACKET => SyncHandler::on_peer_block_headers(sync, io, peer, &rlp),
 | 
			
		||||
			BLOCK_BODIES_PACKET => SyncHandler::on_peer_block_bodies(sync, io, peer, &rlp),
 | 
			
		||||
			RECEIPTS_PACKET => SyncHandler::on_peer_block_receipts(sync, io, peer, &rlp),
 | 
			
		||||
@ -109,10 +102,9 @@ impl SyncHandler {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// Called when peer sends us new consensus packet
 | 
			
		||||
	pub fn on_consensus_packet(io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> {
 | 
			
		||||
	pub fn on_consensus_packet(io: &mut SyncIo, peer_id: PeerId, r: &Rlp) {
 | 
			
		||||
		trace!(target: "sync", "Received consensus packet from {:?}", peer_id);
 | 
			
		||||
		io.chain().queue_consensus_message(r.as_raw().to_vec());
 | 
			
		||||
		Ok(())
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// Called by peer when it is disconnecting
 | 
			
		||||
@ -578,8 +570,8 @@ impl SyncHandler {
 | 
			
		||||
			asking_blocks: Vec::new(),
 | 
			
		||||
			asking_hash: None,
 | 
			
		||||
			ask_time: Instant::now(),
 | 
			
		||||
			last_sent_transactions: HashSet::new(),
 | 
			
		||||
			last_sent_private_transactions: HashSet::new(),
 | 
			
		||||
			last_sent_transactions: Default::default(),
 | 
			
		||||
			last_sent_private_transactions: Default::default(),
 | 
			
		||||
			expired: false,
 | 
			
		||||
			confirmation: if sync.fork_block.is_none() { ForkConfirmation::Confirmed } else { ForkConfirmation::Unconfirmed },
 | 
			
		||||
			asking_snapshot_data: None,
 | 
			
		||||
@ -635,7 +627,7 @@ impl SyncHandler {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// Called when peer sends us new transactions
 | 
			
		||||
	fn on_peer_transactions(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
 | 
			
		||||
	pub fn on_peer_transactions(sync: &ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> {
 | 
			
		||||
		// Accept transactions only when fully synced
 | 
			
		||||
		if !io.is_chain_queue_empty() || (sync.state != SyncState::Idle && sync.state != SyncState::NewBlocks) {
 | 
			
		||||
			trace!(target: "sync", "{} Ignoring transactions while syncing", peer_id);
 | 
			
		||||
 | 
			
		||||
@ -92,17 +92,17 @@ mod propagator;
 | 
			
		||||
mod requester;
 | 
			
		||||
mod supplier;
 | 
			
		||||
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use std::collections::{HashSet, HashMap};
 | 
			
		||||
use std::sync::{Arc, mpsc};
 | 
			
		||||
use std::collections::{HashSet, HashMap, BTreeMap};
 | 
			
		||||
use std::cmp;
 | 
			
		||||
use std::time::{Duration, Instant};
 | 
			
		||||
use hash::keccak;
 | 
			
		||||
use heapsize::HeapSizeOf;
 | 
			
		||||
use ethereum_types::{H256, U256};
 | 
			
		||||
use fastmap::H256FastMap;
 | 
			
		||||
use parking_lot::RwLock;
 | 
			
		||||
use fastmap::{H256FastMap, H256FastSet};
 | 
			
		||||
use parking_lot::{Mutex, RwLock, RwLockWriteGuard};
 | 
			
		||||
use bytes::Bytes;
 | 
			
		||||
use rlp::{Rlp, RlpStream, DecoderError};
 | 
			
		||||
use rlp::{RlpStream, DecoderError};
 | 
			
		||||
use network::{self, PeerId, PacketId};
 | 
			
		||||
use ethcore::header::{BlockNumber};
 | 
			
		||||
use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo, BlockQueueInfo};
 | 
			
		||||
@ -112,7 +112,7 @@ use super::{WarpSync, SyncConfig};
 | 
			
		||||
use block_sync::{BlockDownloader, DownloadAction};
 | 
			
		||||
use rand::Rng;
 | 
			
		||||
use snapshot::{Snapshot};
 | 
			
		||||
use api::{EthProtocolInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID};
 | 
			
		||||
use api::{EthProtocolInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID, PriorityTask};
 | 
			
		||||
use private_tx::PrivateTxHandler;
 | 
			
		||||
use transactions_stats::{TransactionsStats, Stats as TransactionStats};
 | 
			
		||||
use transaction::UnverifiedTransaction;
 | 
			
		||||
@ -120,7 +120,7 @@ use transaction::UnverifiedTransaction;
 | 
			
		||||
use self::handler::SyncHandler;
 | 
			
		||||
use self::propagator::SyncPropagator;
 | 
			
		||||
use self::requester::SyncRequester;
 | 
			
		||||
use self::supplier::SyncSupplier;
 | 
			
		||||
pub(crate) use self::supplier::SyncSupplier;
 | 
			
		||||
 | 
			
		||||
known_heap_size!(0, PeerInfo);
 | 
			
		||||
 | 
			
		||||
@ -187,6 +187,11 @@ const FORK_HEADER_TIMEOUT: Duration = Duration::from_secs(3);
 | 
			
		||||
const SNAPSHOT_MANIFEST_TIMEOUT: Duration = Duration::from_secs(5);
 | 
			
		||||
const SNAPSHOT_DATA_TIMEOUT: Duration = Duration::from_secs(120);
 | 
			
		||||
 | 
			
		||||
/// Defines how much time we have to complete priority transaction or block propagation.
 | 
			
		||||
/// after the deadline is reached the task is considered finished
 | 
			
		||||
/// (so we might sent only to some part of the peers we originally intended to send to)
 | 
			
		||||
const PRIORITY_TASK_DEADLINE: Duration = Duration::from_millis(100);
 | 
			
		||||
 | 
			
		||||
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
 | 
			
		||||
/// Sync state
 | 
			
		||||
pub enum SyncState {
 | 
			
		||||
@ -323,9 +328,9 @@ pub struct PeerInfo {
 | 
			
		||||
	/// Request timestamp
 | 
			
		||||
	ask_time: Instant,
 | 
			
		||||
	/// Holds a set of transactions recently sent to this peer to avoid spamming.
 | 
			
		||||
	last_sent_transactions: HashSet<H256>,
 | 
			
		||||
	last_sent_transactions: H256FastSet,
 | 
			
		||||
	/// Holds a set of private transactions and their signatures recently sent to this peer to avoid spamming.
 | 
			
		||||
	last_sent_private_transactions: HashSet<H256>,
 | 
			
		||||
	last_sent_private_transactions: H256FastSet,
 | 
			
		||||
	/// Pending request is expired and result should be ignored
 | 
			
		||||
	expired: bool,
 | 
			
		||||
	/// Peer fork confirmation status
 | 
			
		||||
@ -375,6 +380,217 @@ pub mod random {
 | 
			
		||||
pub type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
 | 
			
		||||
pub type Peers = HashMap<PeerId, PeerInfo>;
 | 
			
		||||
 | 
			
		||||
/// Thread-safe wrapper for `ChainSync`.
 | 
			
		||||
///
 | 
			
		||||
/// NOTE always lock in order of fields declaration
 | 
			
		||||
pub struct ChainSyncApi {
 | 
			
		||||
	/// Priority tasks queue
 | 
			
		||||
	priority_tasks: Mutex<mpsc::Receiver<PriorityTask>>,
 | 
			
		||||
	/// The rest of sync data
 | 
			
		||||
	sync: RwLock<ChainSync>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl ChainSyncApi {
 | 
			
		||||
	/// Creates new `ChainSyncApi`
 | 
			
		||||
	pub fn new(
 | 
			
		||||
		config: SyncConfig,
 | 
			
		||||
		chain: &BlockChainClient,
 | 
			
		||||
		private_tx_handler: Arc<PrivateTxHandler>,
 | 
			
		||||
		priority_tasks: mpsc::Receiver<PriorityTask>,
 | 
			
		||||
	) -> Self {
 | 
			
		||||
		ChainSyncApi {
 | 
			
		||||
			sync: RwLock::new(ChainSync::new(config, chain, private_tx_handler)),
 | 
			
		||||
			priority_tasks: Mutex::new(priority_tasks),
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// Gives `write` access to underlying `ChainSync`
 | 
			
		||||
	pub fn write(&self) -> RwLockWriteGuard<ChainSync> {
 | 
			
		||||
		self.sync.write()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// Returns info about given list of peers
 | 
			
		||||
	pub fn peer_info(&self, ids: &[PeerId]) -> Vec<Option<PeerInfoDigest>> {
 | 
			
		||||
		let sync = self.sync.read();
 | 
			
		||||
		ids.iter().map(|id| sync.peer_info(id)).collect()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// Returns synchonization status
 | 
			
		||||
	pub fn status(&self) -> SyncStatus {
 | 
			
		||||
		self.sync.read().status()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// Returns transactions propagation statistics
 | 
			
		||||
	pub fn transactions_stats(&self) -> BTreeMap<H256, ::TransactionStats> {
 | 
			
		||||
		self.sync.read().transactions_stats()
 | 
			
		||||
			.iter()
 | 
			
		||||
			.map(|(hash, stats)| (*hash, stats.into()))
 | 
			
		||||
			.collect()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// Dispatch incoming requests and responses
 | 
			
		||||
	pub fn dispatch_packet(&self, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
 | 
			
		||||
		SyncSupplier::dispatch_packet(&self.sync, io, peer, packet_id, data)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// Process a priority propagation queue.
 | 
			
		||||
	/// This task is run from a timer and should be time constrained.
 | 
			
		||||
	/// Hence we set up a deadline for the execution and cancel the task if the deadline is exceeded.
 | 
			
		||||
	///
 | 
			
		||||
	/// NOTE This method should only handle stuff that can be canceled and would reach other peers
 | 
			
		||||
	/// by other means.
 | 
			
		||||
	pub fn process_priority_queue(&self, io: &mut SyncIo) {
 | 
			
		||||
		fn check_deadline(deadline: Instant) -> Option<Duration> {
 | 
			
		||||
			let now = Instant::now();
 | 
			
		||||
			if now > deadline {
 | 
			
		||||
				None
 | 
			
		||||
			} else {
 | 
			
		||||
				Some(deadline - now)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// deadline to get the task from the queue
 | 
			
		||||
		let deadline = Instant::now() + ::api::PRIORITY_TIMER_INTERVAL;
 | 
			
		||||
		let mut work = || {
 | 
			
		||||
			let task = {
 | 
			
		||||
				let tasks = self.priority_tasks.try_lock_until(deadline)?;
 | 
			
		||||
				let left = check_deadline(deadline)?;
 | 
			
		||||
				tasks.recv_timeout(left).ok()?
 | 
			
		||||
			};
 | 
			
		||||
			task.starting();
 | 
			
		||||
			// wait for the sync lock until deadline,
 | 
			
		||||
			// note we might drop the task here if we won't manage to acquire the lock.
 | 
			
		||||
			let mut sync = self.sync.try_write_until(deadline)?;
 | 
			
		||||
			// since we already have everything let's use a different deadline
 | 
			
		||||
			// to do the rest of the job now, so that previous work is not wasted.
 | 
			
		||||
			let deadline = Instant::now() + PRIORITY_TASK_DEADLINE;
 | 
			
		||||
			let as_ms = move |prev| {
 | 
			
		||||
				let dur: Duration = Instant::now() - prev;
 | 
			
		||||
				dur.as_secs() * 1_000 + dur.subsec_millis() as u64
 | 
			
		||||
			};
 | 
			
		||||
			match task {
 | 
			
		||||
				// NOTE We can't simply use existing methods,
 | 
			
		||||
				// cause the block is not in the DB yet.
 | 
			
		||||
				PriorityTask::PropagateBlock { started, block, hash, difficulty } => {
 | 
			
		||||
					// try to send to peers that are on the same block as us
 | 
			
		||||
					// (they will most likely accept the new block).
 | 
			
		||||
					let chain_info = io.chain().chain_info();
 | 
			
		||||
					let total_difficulty = chain_info.total_difficulty + difficulty;
 | 
			
		||||
					let rlp = ChainSync::create_block_rlp(&block, total_difficulty);
 | 
			
		||||
					for peers in sync.get_peers(&chain_info, PeerState::SameBlock).chunks(10) {
 | 
			
		||||
						check_deadline(deadline)?;
 | 
			
		||||
						for peer in peers {
 | 
			
		||||
							SyncPropagator::send_packet(io, *peer, NEW_BLOCK_PACKET, rlp.clone());
 | 
			
		||||
							if let Some(ref mut peer) = sync.peers.get_mut(peer) {
 | 
			
		||||
								peer.latest_hash = hash;
 | 
			
		||||
							}
 | 
			
		||||
						}
 | 
			
		||||
					}
 | 
			
		||||
					debug!(target: "sync", "Finished block propagation, took {}ms", as_ms(started));
 | 
			
		||||
				},
 | 
			
		||||
				PriorityTask::PropagateTransactions(time, _) => {
 | 
			
		||||
					SyncPropagator::propagate_new_transactions(&mut sync, io, || {
 | 
			
		||||
						check_deadline(deadline).is_some()
 | 
			
		||||
					});
 | 
			
		||||
					debug!(target: "sync", "Finished transaction propagation, took {}ms", as_ms(time));
 | 
			
		||||
				},
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			Some(())
 | 
			
		||||
		};
 | 
			
		||||
 | 
			
		||||
		// Process as many items as we can until the deadline is reached.
 | 
			
		||||
		loop {
 | 
			
		||||
			if work().is_none() {
 | 
			
		||||
				return;
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Static methods
 | 
			
		||||
impl ChainSync {
 | 
			
		||||
	/// creates rlp to send for the tree defined by 'from' and 'to' hashes
 | 
			
		||||
	fn create_new_hashes_rlp(chain: &BlockChainClient, from: &H256, to: &H256) -> Option<Bytes> {
 | 
			
		||||
		match chain.tree_route(from, to) {
 | 
			
		||||
			Some(route) => {
 | 
			
		||||
				let uncles = chain.find_uncles(from).unwrap_or_else(Vec::new);
 | 
			
		||||
				match route.blocks.len() {
 | 
			
		||||
					0 => None,
 | 
			
		||||
					_ => {
 | 
			
		||||
						let mut blocks = route.blocks;
 | 
			
		||||
						blocks.extend(uncles);
 | 
			
		||||
						let mut rlp_stream = RlpStream::new_list(blocks.len());
 | 
			
		||||
						for block_hash in  blocks {
 | 
			
		||||
							let mut hash_rlp = RlpStream::new_list(2);
 | 
			
		||||
							let number = chain.block_header(BlockId::Hash(block_hash.clone()))
 | 
			
		||||
								.expect("chain.tree_route and chain.find_uncles only return hahses of blocks that are in the blockchain. qed.").number();
 | 
			
		||||
							hash_rlp.append(&block_hash);
 | 
			
		||||
							hash_rlp.append(&number);
 | 
			
		||||
							rlp_stream.append_raw(hash_rlp.as_raw(), 1);
 | 
			
		||||
						}
 | 
			
		||||
						Some(rlp_stream.out())
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
			},
 | 
			
		||||
			None => None
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// creates rlp from block bytes and total difficulty
 | 
			
		||||
	fn create_block_rlp(bytes: &Bytes, total_difficulty: U256) -> Bytes {
 | 
			
		||||
		let mut rlp_stream = RlpStream::new_list(2);
 | 
			
		||||
		rlp_stream.append_raw(bytes, 1);
 | 
			
		||||
		rlp_stream.append(&total_difficulty);
 | 
			
		||||
		rlp_stream.out()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// creates latest block rlp for the given client
 | 
			
		||||
	fn create_latest_block_rlp(chain: &BlockChainClient) -> Bytes {
 | 
			
		||||
		Self::create_block_rlp(
 | 
			
		||||
			&chain.block(BlockId::Hash(chain.chain_info().best_block_hash))
 | 
			
		||||
				.expect("Best block always exists").into_inner(),
 | 
			
		||||
			chain.chain_info().total_difficulty
 | 
			
		||||
		)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// creates given hash block rlp for the given client
 | 
			
		||||
	fn create_new_block_rlp(chain: &BlockChainClient, hash: &H256) -> Bytes {
 | 
			
		||||
		Self::create_block_rlp(
 | 
			
		||||
			&chain.block(BlockId::Hash(hash.clone())).expect("Block has just been sealed; qed").into_inner(),
 | 
			
		||||
			chain.block_total_difficulty(BlockId::Hash(hash.clone())).expect("Block has just been sealed; qed.")
 | 
			
		||||
		)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fn select_random_peers(peers: &[PeerId]) -> Vec<PeerId> {
 | 
			
		||||
		// take sqrt(x) peers
 | 
			
		||||
		let mut peers = peers.to_vec();
 | 
			
		||||
		let mut count = (peers.len() as f64).powf(0.5).round() as usize;
 | 
			
		||||
		count = cmp::min(count, MAX_PEERS_PROPAGATION);
 | 
			
		||||
		count = cmp::max(count, MIN_PEERS_PROPAGATION);
 | 
			
		||||
		random::new().shuffle(&mut peers);
 | 
			
		||||
		peers.truncate(count);
 | 
			
		||||
		peers
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fn get_init_state(warp_sync: WarpSync, chain: &BlockChainClient) -> SyncState {
 | 
			
		||||
		let best_block = chain.chain_info().best_block_number;
 | 
			
		||||
		match warp_sync {
 | 
			
		||||
			WarpSync::Enabled => SyncState::WaitingPeers,
 | 
			
		||||
			WarpSync::OnlyAndAfter(block) if block > best_block => SyncState::WaitingPeers,
 | 
			
		||||
			_ => SyncState::Idle,
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// A peer query method for getting a list of peers
 | 
			
		||||
enum PeerState {
 | 
			
		||||
	/// Peer is on different hash than us
 | 
			
		||||
	Lagging,
 | 
			
		||||
	/// Peer is on the same block as us
 | 
			
		||||
	SameBlock
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Blockchain sync handler.
 | 
			
		||||
/// See module documentation for more details.
 | 
			
		||||
pub struct ChainSync {
 | 
			
		||||
@ -417,10 +633,14 @@ pub struct ChainSync {
 | 
			
		||||
 | 
			
		||||
impl ChainSync {
 | 
			
		||||
	/// Create a new instance of syncing strategy.
 | 
			
		||||
	pub fn new(config: SyncConfig, chain: &BlockChainClient, private_tx_handler: Arc<PrivateTxHandler>) -> ChainSync {
 | 
			
		||||
	pub fn new(
 | 
			
		||||
		config: SyncConfig,
 | 
			
		||||
		chain: &BlockChainClient,
 | 
			
		||||
		private_tx_handler: Arc<PrivateTxHandler>,
 | 
			
		||||
	) -> Self {
 | 
			
		||||
		let chain_info = chain.chain_info();
 | 
			
		||||
		let best_block = chain.chain_info().best_block_number;
 | 
			
		||||
		let state = ChainSync::get_init_state(config.warp_sync, chain);
 | 
			
		||||
		let state = Self::get_init_state(config.warp_sync, chain);
 | 
			
		||||
 | 
			
		||||
		let mut sync = ChainSync {
 | 
			
		||||
			state,
 | 
			
		||||
@ -445,15 +665,6 @@ impl ChainSync {
 | 
			
		||||
		sync
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fn get_init_state(warp_sync: WarpSync, chain: &BlockChainClient) -> SyncState {
 | 
			
		||||
		let best_block = chain.chain_info().best_block_number;
 | 
			
		||||
		match warp_sync {
 | 
			
		||||
			WarpSync::Enabled => SyncState::WaitingPeers,
 | 
			
		||||
			WarpSync::OnlyAndAfter(block) if block > best_block => SyncState::WaitingPeers,
 | 
			
		||||
			_ => SyncState::Idle,
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// Returns synchonization status
 | 
			
		||||
	pub fn status(&self) -> SyncStatus {
 | 
			
		||||
		let last_imported_number = self.new_blocks.last_imported_block_number();
 | 
			
		||||
@ -521,7 +732,7 @@ impl ChainSync {
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		self.state = state.unwrap_or_else(|| ChainSync::get_init_state(self.warp_sync, io.chain()));
 | 
			
		||||
		self.state = state.unwrap_or_else(|| Self::get_init_state(self.warp_sync, io.chain()));
 | 
			
		||||
		// Reactivate peers only if some progress has been made
 | 
			
		||||
		// since the last sync round of if starting fresh.
 | 
			
		||||
		self.active_peers = self.peers.keys().cloned().collect();
 | 
			
		||||
@ -1004,67 +1215,24 @@ impl ChainSync {
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// creates rlp to send for the tree defined by 'from' and 'to' hashes
 | 
			
		||||
	fn create_new_hashes_rlp(chain: &BlockChainClient, from: &H256, to: &H256) -> Option<Bytes> {
 | 
			
		||||
		match chain.tree_route(from, to) {
 | 
			
		||||
			Some(route) => {
 | 
			
		||||
				let uncles = chain.find_uncles(from).unwrap_or_else(Vec::new);
 | 
			
		||||
				match route.blocks.len() {
 | 
			
		||||
					0 => None,
 | 
			
		||||
					_ => {
 | 
			
		||||
						let mut blocks = route.blocks;
 | 
			
		||||
						blocks.extend(uncles);
 | 
			
		||||
						let mut rlp_stream = RlpStream::new_list(blocks.len());
 | 
			
		||||
						for block_hash in  blocks {
 | 
			
		||||
							let mut hash_rlp = RlpStream::new_list(2);
 | 
			
		||||
							let number = chain.block_header(BlockId::Hash(block_hash.clone()))
 | 
			
		||||
								.expect("chain.tree_route and chain.find_uncles only return hahses of blocks that are in the blockchain. qed.").number();
 | 
			
		||||
							hash_rlp.append(&block_hash);
 | 
			
		||||
							hash_rlp.append(&number);
 | 
			
		||||
							rlp_stream.append_raw(hash_rlp.as_raw(), 1);
 | 
			
		||||
						}
 | 
			
		||||
						Some(rlp_stream.out())
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
			},
 | 
			
		||||
			None => None
 | 
			
		||||
		}
 | 
			
		||||
	/// returns peer ids that have different block than our chain
 | 
			
		||||
	fn get_lagging_peers(&self, chain_info: &BlockChainInfo) -> Vec<PeerId> {
 | 
			
		||||
		self.get_peers(chain_info, PeerState::Lagging)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// creates rlp from block bytes and total difficulty
 | 
			
		||||
	fn create_block_rlp(bytes: &Bytes, total_difficulty: U256) -> Bytes {
 | 
			
		||||
		let mut rlp_stream = RlpStream::new_list(2);
 | 
			
		||||
		rlp_stream.append_raw(bytes, 1);
 | 
			
		||||
		rlp_stream.append(&total_difficulty);
 | 
			
		||||
		rlp_stream.out()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// creates latest block rlp for the given client
 | 
			
		||||
	fn create_latest_block_rlp(chain: &BlockChainClient) -> Bytes {
 | 
			
		||||
		ChainSync::create_block_rlp(
 | 
			
		||||
			&chain.block(BlockId::Hash(chain.chain_info().best_block_hash))
 | 
			
		||||
				.expect("Best block always exists").into_inner(),
 | 
			
		||||
			chain.chain_info().total_difficulty
 | 
			
		||||
		)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// creates given hash block rlp for the given client
 | 
			
		||||
	fn create_new_block_rlp(chain: &BlockChainClient, hash: &H256) -> Bytes {
 | 
			
		||||
		ChainSync::create_block_rlp(
 | 
			
		||||
			&chain.block(BlockId::Hash(hash.clone())).expect("Block has just been sealed; qed").into_inner(),
 | 
			
		||||
			chain.block_total_difficulty(BlockId::Hash(hash.clone())).expect("Block has just been sealed; qed.")
 | 
			
		||||
		)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// returns peer ids that have different blocks than our chain
 | 
			
		||||
	fn get_lagging_peers(&mut self, chain_info: &BlockChainInfo) -> Vec<PeerId> {
 | 
			
		||||
	/// returns peer ids that have different or the same blocks than our chain
 | 
			
		||||
	fn get_peers(&self, chain_info: &BlockChainInfo, peers: PeerState) -> Vec<PeerId> {
 | 
			
		||||
		let latest_hash = chain_info.best_block_hash;
 | 
			
		||||
		self
 | 
			
		||||
			.peers
 | 
			
		||||
			.iter_mut()
 | 
			
		||||
			.iter()
 | 
			
		||||
			.filter_map(|(&id, ref mut peer_info)| {
 | 
			
		||||
				trace!(target: "sync", "Checking peer our best {} their best {}", latest_hash, peer_info.latest_hash);
 | 
			
		||||
				if peer_info.latest_hash != latest_hash {
 | 
			
		||||
				let matches = match peers {
 | 
			
		||||
					PeerState::Lagging => peer_info.latest_hash != latest_hash,
 | 
			
		||||
					PeerState::SameBlock => peer_info.latest_hash == latest_hash,
 | 
			
		||||
				};
 | 
			
		||||
				if matches {
 | 
			
		||||
					Some(id)
 | 
			
		||||
				} else {
 | 
			
		||||
					None
 | 
			
		||||
@ -1073,17 +1241,6 @@ impl ChainSync {
 | 
			
		||||
			.collect::<Vec<_>>()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fn select_random_peers(peers: &[PeerId]) -> Vec<PeerId> {
 | 
			
		||||
		// take sqrt(x) peers
 | 
			
		||||
		let mut peers = peers.to_vec();
 | 
			
		||||
		let mut count = (peers.len() as f64).powf(0.5).round() as usize;
 | 
			
		||||
		count = cmp::min(count, MAX_PEERS_PROPAGATION);
 | 
			
		||||
		count = cmp::max(count, MIN_PEERS_PROPAGATION);
 | 
			
		||||
		random::new().shuffle(&mut peers);
 | 
			
		||||
		peers.truncate(count);
 | 
			
		||||
		peers
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fn get_consensus_peers(&self) -> Vec<PeerId> {
 | 
			
		||||
		self.peers.iter().filter_map(|(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_2.0 { Some(*id) } else { None }).collect()
 | 
			
		||||
	}
 | 
			
		||||
@ -1132,21 +1289,10 @@ impl ChainSync {
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// Dispatch incoming requests and responses
 | 
			
		||||
	pub fn dispatch_packet(sync: &RwLock<ChainSync>, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
 | 
			
		||||
		SyncSupplier::dispatch_packet(sync, io, peer, packet_id, data)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	pub fn on_packet(&mut self, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
 | 
			
		||||
		debug!(target: "sync", "{} -> Dispatching packet: {}", peer, packet_id);
 | 
			
		||||
		SyncHandler::on_packet(self, io, peer, packet_id, data);
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// Called when peer sends us new consensus packet
 | 
			
		||||
	pub fn on_consensus_packet(io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> {
 | 
			
		||||
		SyncHandler::on_consensus_packet(io, peer_id, r)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// Called by peer when it is disconnecting
 | 
			
		||||
	pub fn on_peer_aborting(&mut self, io: &mut SyncIo, peer: PeerId) {
 | 
			
		||||
		SyncHandler::on_peer_aborting(self, io, peer);
 | 
			
		||||
@ -1158,8 +1304,16 @@ impl ChainSync {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// propagates new transactions to all peers
 | 
			
		||||
	pub fn propagate_new_transactions(&mut self, io: &mut SyncIo) -> usize {
 | 
			
		||||
		SyncPropagator::propagate_new_transactions(self, io)
 | 
			
		||||
	pub fn propagate_new_transactions(&mut self, io: &mut SyncIo) {
 | 
			
		||||
		let deadline = Instant::now() + Duration::from_millis(500);
 | 
			
		||||
		SyncPropagator::propagate_new_transactions(self, io, || {
 | 
			
		||||
			if deadline > Instant::now() {
 | 
			
		||||
				true
 | 
			
		||||
			} else {
 | 
			
		||||
				debug!(target: "sync", "Wasn't able to finish transaction propagation within a deadline.");
 | 
			
		||||
				false
 | 
			
		||||
			}
 | 
			
		||||
		});
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// Broadcast consensus message to peers.
 | 
			
		||||
@ -1175,7 +1329,7 @@ impl ChainSync {
 | 
			
		||||
 | 
			
		||||
#[cfg(test)]
 | 
			
		||||
pub mod tests {
 | 
			
		||||
	use std::collections::{HashSet, VecDeque};
 | 
			
		||||
	use std::collections::{VecDeque};
 | 
			
		||||
	use ethkey;
 | 
			
		||||
	use network::PeerId;
 | 
			
		||||
	use tests::helpers::{TestIo};
 | 
			
		||||
@ -1291,8 +1445,8 @@ pub mod tests {
 | 
			
		||||
				asking_blocks: Vec::new(),
 | 
			
		||||
				asking_hash: None,
 | 
			
		||||
				ask_time: Instant::now(),
 | 
			
		||||
				last_sent_transactions: HashSet::new(),
 | 
			
		||||
				last_sent_private_transactions: HashSet::new(),
 | 
			
		||||
				last_sent_transactions: Default::default(),
 | 
			
		||||
				last_sent_private_transactions: Default::default(),
 | 
			
		||||
				expired: false,
 | 
			
		||||
				confirmation: super::ForkConfirmation::Confirmed,
 | 
			
		||||
				snapshot_number: None,
 | 
			
		||||
@ -1307,7 +1461,7 @@ pub mod tests {
 | 
			
		||||
	fn finds_lagging_peers() {
 | 
			
		||||
		let mut client = TestBlockChainClient::new();
 | 
			
		||||
		client.add_blocks(100, EachBlockWith::Uncle);
 | 
			
		||||
		let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(10), &client);
 | 
			
		||||
		let sync = dummy_sync_with_peer(client.block_hash_delta_minus(10), &client);
 | 
			
		||||
		let chain_info = client.chain_info();
 | 
			
		||||
 | 
			
		||||
		let lagging_peers = sync.get_lagging_peers(&chain_info);
 | 
			
		||||
@ -1447,3 +1601,4 @@ pub mod tests {
 | 
			
		||||
		assert_eq!(status.status.transaction_count, 0);
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -18,6 +18,7 @@ use bytes::Bytes;
 | 
			
		||||
use ethereum_types::H256;
 | 
			
		||||
use ethcore::client::BlockChainInfo;
 | 
			
		||||
use ethcore::header::BlockNumber;
 | 
			
		||||
use fastmap::H256FastSet;
 | 
			
		||||
use network::{PeerId, PacketId};
 | 
			
		||||
use rand::Rng;
 | 
			
		||||
use rlp::{Encodable, RlpStream};
 | 
			
		||||
@ -69,49 +70,51 @@ impl SyncPropagator {
 | 
			
		||||
	/// propagates latest block to a set of peers
 | 
			
		||||
	pub fn propagate_blocks(sync: &mut ChainSync, chain_info: &BlockChainInfo, io: &mut SyncIo, blocks: &[H256], peers: &[PeerId]) -> usize {
 | 
			
		||||
		trace!(target: "sync", "Sending NewBlocks to {:?}", peers);
 | 
			
		||||
		let mut sent = 0;
 | 
			
		||||
		for peer_id in peers {
 | 
			
		||||
			if blocks.is_empty() {
 | 
			
		||||
				let rlp =  ChainSync::create_latest_block_rlp(io.chain());
 | 
			
		||||
				SyncPropagator::send_packet(io, *peer_id, NEW_BLOCK_PACKET, rlp);
 | 
			
		||||
			} else {
 | 
			
		||||
				for h in blocks {
 | 
			
		||||
					let rlp =  ChainSync::create_new_block_rlp(io.chain(), h);
 | 
			
		||||
					SyncPropagator::send_packet(io, *peer_id, NEW_BLOCK_PACKET, rlp);
 | 
			
		||||
		let sent = peers.len();
 | 
			
		||||
		let mut send_packet = |io: &mut SyncIo, rlp: Bytes| {
 | 
			
		||||
			for peer_id in peers {
 | 
			
		||||
				SyncPropagator::send_packet(io, *peer_id, NEW_BLOCK_PACKET, rlp.clone());
 | 
			
		||||
				if let Some(ref mut peer) = sync.peers.get_mut(peer_id) {
 | 
			
		||||
					peer.latest_hash = chain_info.best_block_hash.clone();
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
			if let Some(ref mut peer) = sync.peers.get_mut(peer_id) {
 | 
			
		||||
				peer.latest_hash = chain_info.best_block_hash.clone();
 | 
			
		||||
		};
 | 
			
		||||
 | 
			
		||||
		if blocks.is_empty() {
 | 
			
		||||
			let rlp =  ChainSync::create_latest_block_rlp(io.chain());
 | 
			
		||||
			send_packet(io, rlp);
 | 
			
		||||
		} else {
 | 
			
		||||
			for h in blocks {
 | 
			
		||||
				let rlp =  ChainSync::create_new_block_rlp(io.chain(), h);
 | 
			
		||||
				send_packet(io, rlp);
 | 
			
		||||
			}
 | 
			
		||||
			sent += 1;
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		sent
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// propagates new known hashes to all peers
 | 
			
		||||
	pub fn propagate_new_hashes(sync: &mut ChainSync, chain_info: &BlockChainInfo, io: &mut SyncIo, peers: &[PeerId]) -> usize {
 | 
			
		||||
		trace!(target: "sync", "Sending NewHashes to {:?}", peers);
 | 
			
		||||
		let mut sent = 0;
 | 
			
		||||
		let last_parent = *io.chain().best_block_header().parent_hash();
 | 
			
		||||
		let best_block_hash = chain_info.best_block_hash;
 | 
			
		||||
		let rlp = match ChainSync::create_new_hashes_rlp(io.chain(), &last_parent, &best_block_hash) {
 | 
			
		||||
			Some(rlp) => rlp,
 | 
			
		||||
			None => return 0
 | 
			
		||||
		};
 | 
			
		||||
 | 
			
		||||
		let sent = peers.len();
 | 
			
		||||
		for peer_id in peers {
 | 
			
		||||
			sent += match ChainSync::create_new_hashes_rlp(io.chain(), &last_parent, &chain_info.best_block_hash) {
 | 
			
		||||
				Some(rlp) => {
 | 
			
		||||
					{
 | 
			
		||||
						if let Some(ref mut peer) = sync.peers.get_mut(peer_id) {
 | 
			
		||||
							peer.latest_hash = chain_info.best_block_hash.clone();
 | 
			
		||||
						}
 | 
			
		||||
					}
 | 
			
		||||
					SyncPropagator::send_packet(io, *peer_id, NEW_BLOCK_HASHES_PACKET, rlp);
 | 
			
		||||
					1
 | 
			
		||||
				},
 | 
			
		||||
				None => 0
 | 
			
		||||
			if let Some(ref mut peer) = sync.peers.get_mut(peer_id) {
 | 
			
		||||
				peer.latest_hash = best_block_hash;
 | 
			
		||||
			}
 | 
			
		||||
			SyncPropagator::send_packet(io, *peer_id, NEW_BLOCK_HASHES_PACKET, rlp.clone());
 | 
			
		||||
		}
 | 
			
		||||
		sent
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// propagates new transactions to all peers
 | 
			
		||||
	pub fn propagate_new_transactions(sync: &mut ChainSync, io: &mut SyncIo) -> usize {
 | 
			
		||||
	pub fn propagate_new_transactions<F: FnMut() -> bool>(sync: &mut ChainSync, io: &mut SyncIo, mut should_continue: F) -> usize {
 | 
			
		||||
		// Early out if nobody to send to.
 | 
			
		||||
		if sync.peers.is_empty() {
 | 
			
		||||
			return 0;
 | 
			
		||||
@ -122,6 +125,10 @@ impl SyncPropagator {
 | 
			
		||||
			return 0;
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if !should_continue() {
 | 
			
		||||
			return 0;
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		let (transactions, service_transactions): (Vec<_>, Vec<_>) = transactions.iter()
 | 
			
		||||
			.map(|tx| tx.signed())
 | 
			
		||||
			.partition(|tx| !tx.gas_price.is_zero());
 | 
			
		||||
@ -130,24 +137,34 @@ impl SyncPropagator {
 | 
			
		||||
		let mut affected_peers = HashSet::new();
 | 
			
		||||
		if !transactions.is_empty() {
 | 
			
		||||
			let peers = SyncPropagator::select_peers_for_transactions(sync, |_| true);
 | 
			
		||||
			affected_peers = SyncPropagator::propagate_transactions_to_peers(sync, io, peers, transactions);
 | 
			
		||||
			affected_peers = SyncPropagator::propagate_transactions_to_peers(
 | 
			
		||||
				sync, io, peers, transactions, &mut should_continue,
 | 
			
		||||
			);
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// most of times service_transactions will be empty
 | 
			
		||||
		// => there's no need to merge packets
 | 
			
		||||
		if !service_transactions.is_empty() {
 | 
			
		||||
			let service_transactions_peers = SyncPropagator::select_peers_for_transactions(sync, |peer_id| accepts_service_transaction(&io.peer_info(*peer_id)));
 | 
			
		||||
			let service_transactions_affected_peers = SyncPropagator::propagate_transactions_to_peers(sync, io, service_transactions_peers, service_transactions);
 | 
			
		||||
			let service_transactions_affected_peers = SyncPropagator::propagate_transactions_to_peers(
 | 
			
		||||
				sync, io, service_transactions_peers, service_transactions, &mut should_continue
 | 
			
		||||
			);
 | 
			
		||||
			affected_peers.extend(&service_transactions_affected_peers);
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		affected_peers.len()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fn propagate_transactions_to_peers(sync: &mut ChainSync, io: &mut SyncIo, peers: Vec<PeerId>, transactions: Vec<&SignedTransaction>) -> HashSet<PeerId> {
 | 
			
		||||
	fn propagate_transactions_to_peers<F: FnMut() -> bool>(
 | 
			
		||||
		sync: &mut ChainSync,
 | 
			
		||||
		io: &mut SyncIo,
 | 
			
		||||
		peers: Vec<PeerId>,
 | 
			
		||||
		transactions: Vec<&SignedTransaction>,
 | 
			
		||||
		mut should_continue: F,
 | 
			
		||||
	) -> HashSet<PeerId> {
 | 
			
		||||
		let all_transactions_hashes = transactions.iter()
 | 
			
		||||
			.map(|tx| tx.hash())
 | 
			
		||||
			.collect::<HashSet<H256>>();
 | 
			
		||||
			.collect::<H256FastSet>();
 | 
			
		||||
		let all_transactions_rlp = {
 | 
			
		||||
			let mut packet = RlpStream::new_list(transactions.len());
 | 
			
		||||
			for tx in &transactions { packet.append(&**tx); }
 | 
			
		||||
@ -157,102 +174,104 @@ impl SyncPropagator {
 | 
			
		||||
		// Clear old transactions from stats
 | 
			
		||||
		sync.transactions_stats.retain(&all_transactions_hashes);
 | 
			
		||||
 | 
			
		||||
		// sqrt(x)/x scaled to max u32
 | 
			
		||||
		let block_number = io.chain().chain_info().best_block_number;
 | 
			
		||||
 | 
			
		||||
		let lucky_peers = {
 | 
			
		||||
			peers.into_iter()
 | 
			
		||||
				.filter_map(|peer_id| {
 | 
			
		||||
					let stats = &mut sync.transactions_stats;
 | 
			
		||||
					let peer_info = sync.peers.get_mut(&peer_id)
 | 
			
		||||
						.expect("peer_id is form peers; peers is result of select_peers_for_transactions; select_peers_for_transactions selects peers from self.peers; qed");
 | 
			
		||||
 | 
			
		||||
					// Send all transactions
 | 
			
		||||
					if peer_info.last_sent_transactions.is_empty() {
 | 
			
		||||
						// update stats
 | 
			
		||||
						for hash in &all_transactions_hashes {
 | 
			
		||||
							let id = io.peer_session_info(peer_id).and_then(|info| info.id);
 | 
			
		||||
							stats.propagated(hash, id, block_number);
 | 
			
		||||
						}
 | 
			
		||||
						peer_info.last_sent_transactions = all_transactions_hashes.clone();
 | 
			
		||||
						return Some((peer_id, all_transactions_hashes.len(), all_transactions_rlp.clone()));
 | 
			
		||||
					}
 | 
			
		||||
 | 
			
		||||
					// Get hashes of all transactions to send to this peer
 | 
			
		||||
					let to_send = all_transactions_hashes.difference(&peer_info.last_sent_transactions)
 | 
			
		||||
						.cloned()
 | 
			
		||||
						.collect::<HashSet<_>>();
 | 
			
		||||
					if to_send.is_empty() {
 | 
			
		||||
						return None;
 | 
			
		||||
					}
 | 
			
		||||
 | 
			
		||||
					// Construct RLP
 | 
			
		||||
					let (packet, to_send) = {
 | 
			
		||||
						let mut to_send = to_send;
 | 
			
		||||
						let mut packet = RlpStream::new();
 | 
			
		||||
						packet.begin_unbounded_list();
 | 
			
		||||
						let mut pushed = 0;
 | 
			
		||||
						for tx in &transactions {
 | 
			
		||||
							let hash = tx.hash();
 | 
			
		||||
							if to_send.contains(&hash) {
 | 
			
		||||
								let mut transaction = RlpStream::new();
 | 
			
		||||
								tx.rlp_append(&mut transaction);
 | 
			
		||||
								let appended = packet.append_raw_checked(&transaction.drain(), 1, MAX_TRANSACTION_PACKET_SIZE);
 | 
			
		||||
								if !appended {
 | 
			
		||||
									// Maximal packet size reached just proceed with sending
 | 
			
		||||
									debug!(target: "sync", "Transaction packet size limit reached. Sending incomplete set of {}/{} transactions.", pushed, to_send.len());
 | 
			
		||||
									to_send = to_send.into_iter().take(pushed).collect();
 | 
			
		||||
									break;
 | 
			
		||||
								}
 | 
			
		||||
								pushed += 1;
 | 
			
		||||
							}
 | 
			
		||||
						}
 | 
			
		||||
						packet.complete_unbounded_list();
 | 
			
		||||
						(packet, to_send)
 | 
			
		||||
					};
 | 
			
		||||
 | 
			
		||||
					// Update stats
 | 
			
		||||
					let id = io.peer_session_info(peer_id).and_then(|info| info.id);
 | 
			
		||||
					for hash in &to_send {
 | 
			
		||||
						// update stats
 | 
			
		||||
						stats.propagated(hash, id, block_number);
 | 
			
		||||
					}
 | 
			
		||||
 | 
			
		||||
					peer_info.last_sent_transactions = all_transactions_hashes
 | 
			
		||||
						.intersection(&peer_info.last_sent_transactions)
 | 
			
		||||
						.chain(&to_send)
 | 
			
		||||
						.cloned()
 | 
			
		||||
						.collect();
 | 
			
		||||
					Some((peer_id, to_send.len(), packet.out()))
 | 
			
		||||
				})
 | 
			
		||||
				.collect::<Vec<_>>()
 | 
			
		||||
		let send_packet = |io: &mut SyncIo, peer_id: PeerId, sent: usize, rlp: Bytes| {
 | 
			
		||||
			let size = rlp.len();
 | 
			
		||||
			SyncPropagator::send_packet(io, peer_id, TRANSACTIONS_PACKET, rlp);
 | 
			
		||||
			trace!(target: "sync", "{:02} <- Transactions ({} entries; {} bytes)", peer_id, sent, size);
 | 
			
		||||
		};
 | 
			
		||||
 | 
			
		||||
		// Send RLPs
 | 
			
		||||
		let mut peers = HashSet::new();
 | 
			
		||||
		if lucky_peers.len() > 0 {
 | 
			
		||||
			let mut max_sent = 0;
 | 
			
		||||
			let lucky_peers_len = lucky_peers.len();
 | 
			
		||||
			for (peer_id, sent, rlp) in lucky_peers {
 | 
			
		||||
				peers.insert(peer_id);
 | 
			
		||||
				let size = rlp.len();
 | 
			
		||||
				SyncPropagator::send_packet(io, peer_id, TRANSACTIONS_PACKET, rlp);
 | 
			
		||||
				trace!(target: "sync", "{:02} <- Transactions ({} entries; {} bytes)", peer_id, sent, size);
 | 
			
		||||
				max_sent = cmp::max(max_sent, sent);
 | 
			
		||||
		let block_number = io.chain().chain_info().best_block_number;
 | 
			
		||||
		let mut sent_to_peers = HashSet::new();
 | 
			
		||||
		let mut max_sent = 0;
 | 
			
		||||
 | 
			
		||||
		// for every peer construct and send transactions packet
 | 
			
		||||
		for peer_id in peers {
 | 
			
		||||
			if !should_continue() {
 | 
			
		||||
				debug!(target: "sync", "Sent up to {} transactions to {} peers.", max_sent, sent_to_peers.len());
 | 
			
		||||
				return sent_to_peers;
 | 
			
		||||
			}
 | 
			
		||||
			debug!(target: "sync", "Sent up to {} transactions to {} peers.", max_sent, lucky_peers_len);
 | 
			
		||||
 | 
			
		||||
			let stats = &mut sync.transactions_stats;
 | 
			
		||||
			let peer_info = sync.peers.get_mut(&peer_id)
 | 
			
		||||
				.expect("peer_id is form peers; peers is result of select_peers_for_transactions; select_peers_for_transactions selects peers from self.peers; qed");
 | 
			
		||||
 | 
			
		||||
			// Send all transactions, if the peer doesn't know about anything
 | 
			
		||||
			if peer_info.last_sent_transactions.is_empty() {
 | 
			
		||||
				// update stats
 | 
			
		||||
				for hash in &all_transactions_hashes {
 | 
			
		||||
					let id = io.peer_session_info(peer_id).and_then(|info| info.id);
 | 
			
		||||
					stats.propagated(hash, id, block_number);
 | 
			
		||||
				}
 | 
			
		||||
				peer_info.last_sent_transactions = all_transactions_hashes.clone();
 | 
			
		||||
 | 
			
		||||
				send_packet(io, peer_id, all_transactions_hashes.len(), all_transactions_rlp.clone());
 | 
			
		||||
				sent_to_peers.insert(peer_id);
 | 
			
		||||
				max_sent = cmp::max(max_sent, all_transactions_hashes.len());
 | 
			
		||||
				continue;
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// Get hashes of all transactions to send to this peer
 | 
			
		||||
			let to_send = all_transactions_hashes.difference(&peer_info.last_sent_transactions)
 | 
			
		||||
				.cloned()
 | 
			
		||||
				.collect::<HashSet<_>>();
 | 
			
		||||
			if to_send.is_empty() {
 | 
			
		||||
				continue;
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// Construct RLP
 | 
			
		||||
			let (packet, to_send) = {
 | 
			
		||||
				let mut to_send = to_send;
 | 
			
		||||
				let mut packet = RlpStream::new();
 | 
			
		||||
				packet.begin_unbounded_list();
 | 
			
		||||
				let mut pushed = 0;
 | 
			
		||||
				for tx in &transactions {
 | 
			
		||||
					let hash = tx.hash();
 | 
			
		||||
					if to_send.contains(&hash) {
 | 
			
		||||
						let mut transaction = RlpStream::new();
 | 
			
		||||
						tx.rlp_append(&mut transaction);
 | 
			
		||||
						let appended = packet.append_raw_checked(&transaction.drain(), 1, MAX_TRANSACTION_PACKET_SIZE);
 | 
			
		||||
						if !appended {
 | 
			
		||||
							// Maximal packet size reached just proceed with sending
 | 
			
		||||
							debug!(target: "sync", "Transaction packet size limit reached. Sending incomplete set of {}/{} transactions.", pushed, to_send.len());
 | 
			
		||||
							to_send = to_send.into_iter().take(pushed).collect();
 | 
			
		||||
							break;
 | 
			
		||||
						}
 | 
			
		||||
						pushed += 1;
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
				packet.complete_unbounded_list();
 | 
			
		||||
				(packet, to_send)
 | 
			
		||||
			};
 | 
			
		||||
 | 
			
		||||
			// Update stats
 | 
			
		||||
			let id = io.peer_session_info(peer_id).and_then(|info| info.id);
 | 
			
		||||
			for hash in &to_send {
 | 
			
		||||
				// update stats
 | 
			
		||||
				stats.propagated(hash, id, block_number);
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			peer_info.last_sent_transactions = all_transactions_hashes
 | 
			
		||||
				.intersection(&peer_info.last_sent_transactions)
 | 
			
		||||
				.chain(&to_send)
 | 
			
		||||
				.cloned()
 | 
			
		||||
				.collect();
 | 
			
		||||
			send_packet(io, peer_id, to_send.len(), packet.out());
 | 
			
		||||
			sent_to_peers.insert(peer_id);
 | 
			
		||||
			max_sent = cmp::max(max_sent, to_send.len());
 | 
			
		||||
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		peers
 | 
			
		||||
		debug!(target: "sync", "Sent up to {} transactions to {} peers.", max_sent, sent_to_peers.len());
 | 
			
		||||
		sent_to_peers
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	pub fn propagate_latest_blocks(sync: &mut ChainSync, io: &mut SyncIo, sealed: &[H256]) {
 | 
			
		||||
		let chain_info = io.chain().chain_info();
 | 
			
		||||
		if (((chain_info.best_block_number as i64) - (sync.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION {
 | 
			
		||||
			let mut peers = sync.get_lagging_peers(&chain_info);
 | 
			
		||||
			let peers = sync.get_lagging_peers(&chain_info);
 | 
			
		||||
			if sealed.is_empty() {
 | 
			
		||||
				let hashes = SyncPropagator::propagate_new_hashes(sync, &chain_info, io, &peers);
 | 
			
		||||
				peers = ChainSync::select_random_peers(&peers);
 | 
			
		||||
				let peers = ChainSync::select_random_peers(&peers);
 | 
			
		||||
				let blocks = SyncPropagator::propagate_blocks(sync, &chain_info, io, sealed, &peers);
 | 
			
		||||
				if blocks != 0 || hashes != 0 {
 | 
			
		||||
					trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes);
 | 
			
		||||
@ -318,7 +337,7 @@ impl SyncPropagator {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// Generic packet sender
 | 
			
		||||
	fn send_packet(sync: &mut SyncIo, peer_id: PeerId, packet_id: PacketId, packet: Bytes) {
 | 
			
		||||
	pub fn send_packet(sync: &mut SyncIo, peer_id: PeerId, packet_id: PacketId, packet: Bytes) {
 | 
			
		||||
		if let Err(e) = sync.send(peer_id, packet_id, packet) {
 | 
			
		||||
			debug!(target:"sync", "Error sending packet: {:?}", e);
 | 
			
		||||
			sync.disconnect_peer(peer_id);
 | 
			
		||||
@ -419,8 +438,8 @@ mod tests {
 | 
			
		||||
				asking_blocks: Vec::new(),
 | 
			
		||||
				asking_hash: None,
 | 
			
		||||
				ask_time: Instant::now(),
 | 
			
		||||
				last_sent_transactions: HashSet::new(),
 | 
			
		||||
				last_sent_private_transactions: HashSet::new(),
 | 
			
		||||
				last_sent_transactions: Default::default(),
 | 
			
		||||
				last_sent_private_transactions: Default::default(),
 | 
			
		||||
				expired: false,
 | 
			
		||||
				confirmation: ForkConfirmation::Confirmed,
 | 
			
		||||
				snapshot_number: None,
 | 
			
		||||
@ -447,13 +466,13 @@ mod tests {
 | 
			
		||||
		let queue = RwLock::new(VecDeque::new());
 | 
			
		||||
		let ss = TestSnapshotService::new();
 | 
			
		||||
		let mut io = TestIo::new(&mut client, &ss, &queue, None);
 | 
			
		||||
		let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io);
 | 
			
		||||
		let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true);
 | 
			
		||||
		// Try to propagate same transactions for the second time
 | 
			
		||||
		let peer_count2 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io);
 | 
			
		||||
		let peer_count2 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true);
 | 
			
		||||
		// Even after new block transactions should not be propagated twice
 | 
			
		||||
		sync.chain_new_blocks(&mut io, &[], &[], &[], &[], &[], &[]);
 | 
			
		||||
		// Try to propagate same transactions for the third time
 | 
			
		||||
		let peer_count3 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io);
 | 
			
		||||
		let peer_count3 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true);
 | 
			
		||||
 | 
			
		||||
		// 1 message should be send
 | 
			
		||||
		assert_eq!(1, io.packets.len());
 | 
			
		||||
@ -474,7 +493,7 @@ mod tests {
 | 
			
		||||
		let queue = RwLock::new(VecDeque::new());
 | 
			
		||||
		let ss = TestSnapshotService::new();
 | 
			
		||||
		let mut io = TestIo::new(&mut client, &ss, &queue, None);
 | 
			
		||||
		let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io);
 | 
			
		||||
		let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true);
 | 
			
		||||
		io.chain.insert_transaction_to_queue();
 | 
			
		||||
		// New block import should not trigger propagation.
 | 
			
		||||
		// (we only propagate on timeout)
 | 
			
		||||
@ -498,10 +517,10 @@ mod tests {
 | 
			
		||||
		let queue = RwLock::new(VecDeque::new());
 | 
			
		||||
		let ss = TestSnapshotService::new();
 | 
			
		||||
		let mut io = TestIo::new(&mut client, &ss, &queue, None);
 | 
			
		||||
		let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io);
 | 
			
		||||
		let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true);
 | 
			
		||||
		sync.chain_new_blocks(&mut io, &[], &[], &[], &[], &[], &[]);
 | 
			
		||||
		// Try to propagate same transactions for the second time
 | 
			
		||||
		let peer_count2 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io);
 | 
			
		||||
		let peer_count2 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true);
 | 
			
		||||
 | 
			
		||||
		assert_eq!(0, io.packets.len());
 | 
			
		||||
		assert_eq!(0, peer_count);
 | 
			
		||||
@ -519,7 +538,7 @@ mod tests {
 | 
			
		||||
		// should sent some
 | 
			
		||||
		{
 | 
			
		||||
			let mut io = TestIo::new(&mut client, &ss, &queue, None);
 | 
			
		||||
			let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io);
 | 
			
		||||
			let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true);
 | 
			
		||||
			assert_eq!(1, io.packets.len());
 | 
			
		||||
			assert_eq!(1, peer_count);
 | 
			
		||||
		}
 | 
			
		||||
@ -528,9 +547,9 @@ mod tests {
 | 
			
		||||
		let (peer_count2, peer_count3) = {
 | 
			
		||||
			let mut io = TestIo::new(&mut client, &ss, &queue, None);
 | 
			
		||||
			// Propagate new transactions
 | 
			
		||||
			let peer_count2 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io);
 | 
			
		||||
			let peer_count2 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true);
 | 
			
		||||
			// And now the peer should have all transactions
 | 
			
		||||
			let peer_count3 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io);
 | 
			
		||||
			let peer_count3 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true);
 | 
			
		||||
			(peer_count2, peer_count3)
 | 
			
		||||
		};
 | 
			
		||||
 | 
			
		||||
@ -553,7 +572,7 @@ mod tests {
 | 
			
		||||
		let queue = RwLock::new(VecDeque::new());
 | 
			
		||||
		let ss = TestSnapshotService::new();
 | 
			
		||||
		let mut io = TestIo::new(&mut client, &ss, &queue, None);
 | 
			
		||||
		SyncPropagator::propagate_new_transactions(&mut sync, &mut io);
 | 
			
		||||
		SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true);
 | 
			
		||||
 | 
			
		||||
		let stats = sync.transactions_stats();
 | 
			
		||||
		assert_eq!(stats.len(), 1, "Should maintain stats for single transaction.")
 | 
			
		||||
@ -583,7 +602,7 @@ mod tests {
 | 
			
		||||
		io.peers_info.insert(4, "Parity-Ethereum/v2.7.3-ABCDEFGH".to_owned());
 | 
			
		||||
 | 
			
		||||
		// and new service transaction is propagated to peers
 | 
			
		||||
		SyncPropagator::propagate_new_transactions(&mut sync, &mut io);
 | 
			
		||||
		SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true);
 | 
			
		||||
 | 
			
		||||
		// peer#2 && peer#4 are receiving service transaction
 | 
			
		||||
		assert!(io.packets.iter().any(|p| p.packet_id == 0x02 && p.recipient == 2)); // TRANSACTIONS_PACKET
 | 
			
		||||
@ -607,7 +626,7 @@ mod tests {
 | 
			
		||||
		io.peers_info.insert(1, "Parity-Ethereum/v2.6".to_owned());
 | 
			
		||||
 | 
			
		||||
		// and service + non-service transactions are propagated to peers
 | 
			
		||||
		SyncPropagator::propagate_new_transactions(&mut sync, &mut io);
 | 
			
		||||
		SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true);
 | 
			
		||||
 | 
			
		||||
		// two separate packets for peer are queued:
 | 
			
		||||
		// 1) with non-service-transaction
 | 
			
		||||
 | 
			
		||||
@ -27,6 +27,7 @@ use sync_io::SyncIo;
 | 
			
		||||
 | 
			
		||||
use super::{
 | 
			
		||||
	ChainSync,
 | 
			
		||||
	SyncHandler,
 | 
			
		||||
	RlpResponseResult,
 | 
			
		||||
	PacketDecodeError,
 | 
			
		||||
	BLOCK_BODIES_PACKET,
 | 
			
		||||
@ -47,6 +48,8 @@ use super::{
 | 
			
		||||
	RECEIPTS_PACKET,
 | 
			
		||||
	SNAPSHOT_DATA_PACKET,
 | 
			
		||||
	SNAPSHOT_MANIFEST_PACKET,
 | 
			
		||||
	STATUS_PACKET,
 | 
			
		||||
	TRANSACTIONS_PACKET,
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
/// The Chain Sync Supplier: answers requests from peers with available data
 | 
			
		||||
@ -56,6 +59,7 @@ impl SyncSupplier {
 | 
			
		||||
	/// Dispatch incoming requests and responses
 | 
			
		||||
	pub fn dispatch_packet(sync: &RwLock<ChainSync>, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
 | 
			
		||||
		let rlp = Rlp::new(data);
 | 
			
		||||
 | 
			
		||||
		let result = match packet_id {
 | 
			
		||||
			GET_BLOCK_BODIES_PACKET => SyncSupplier::return_rlp(io, &rlp, peer,
 | 
			
		||||
				SyncSupplier::return_block_bodies,
 | 
			
		||||
@ -80,9 +84,39 @@ impl SyncSupplier {
 | 
			
		||||
			GET_SNAPSHOT_DATA_PACKET => SyncSupplier::return_rlp(io, &rlp, peer,
 | 
			
		||||
				SyncSupplier::return_snapshot_data,
 | 
			
		||||
				|e| format!("Error sending snapshot data: {:?}", e)),
 | 
			
		||||
			CONSENSUS_DATA_PACKET => ChainSync::on_consensus_packet(io, peer, &rlp),
 | 
			
		||||
			_ => {
 | 
			
		||||
 | 
			
		||||
			STATUS_PACKET => {
 | 
			
		||||
				sync.write().on_packet(io, peer, packet_id, data);
 | 
			
		||||
				Ok(())
 | 
			
		||||
			},
 | 
			
		||||
			// Packets that require the peer to be confirmed
 | 
			
		||||
			_ => {
 | 
			
		||||
				if !sync.read().peers.contains_key(&peer) {
 | 
			
		||||
					debug!(target:"sync", "Unexpected packet {} from unregistered peer: {}:{}", packet_id, peer, io.peer_info(peer));
 | 
			
		||||
					return;
 | 
			
		||||
				}
 | 
			
		||||
				debug!(target: "sync", "{} -> Dispatching packet: {}", peer, packet_id);
 | 
			
		||||
 | 
			
		||||
				match packet_id {
 | 
			
		||||
					CONSENSUS_DATA_PACKET => {
 | 
			
		||||
						SyncHandler::on_consensus_packet(io, peer, &rlp)
 | 
			
		||||
					},
 | 
			
		||||
					TRANSACTIONS_PACKET => {
 | 
			
		||||
						let res = {
 | 
			
		||||
							let sync_ro = sync.read();
 | 
			
		||||
							SyncHandler::on_peer_transactions(&*sync_ro, io, peer, &rlp)
 | 
			
		||||
						};
 | 
			
		||||
						if res.is_err() {
 | 
			
		||||
							// peer sent invalid data, disconnect.
 | 
			
		||||
							io.disable_peer(peer);
 | 
			
		||||
							sync.write().deactivate_peer(io, peer);
 | 
			
		||||
						}
 | 
			
		||||
					},
 | 
			
		||||
					_ => {
 | 
			
		||||
						sync.write().on_packet(io, peer, packet_id, data);
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				Ok(())
 | 
			
		||||
			}
 | 
			
		||||
		};
 | 
			
		||||
@ -404,7 +438,7 @@ mod test {
 | 
			
		||||
 | 
			
		||||
		io.sender = Some(2usize);
 | 
			
		||||
 | 
			
		||||
		ChainSync::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, GET_NODE_DATA_PACKET, &node_request);
 | 
			
		||||
		SyncSupplier::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, GET_NODE_DATA_PACKET, &node_request);
 | 
			
		||||
		assert_eq!(1, io.packets.len());
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -446,7 +480,7 @@ mod test {
 | 
			
		||||
		assert_eq!(603, rlp_result.unwrap().1.out().len());
 | 
			
		||||
 | 
			
		||||
		io.sender = Some(2usize);
 | 
			
		||||
		ChainSync::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, GET_RECEIPTS_PACKET, &receipts_request);
 | 
			
		||||
		SyncSupplier::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, GET_RECEIPTS_PACKET, &receipts_request);
 | 
			
		||||
		assert_eq!(1, io.packets.len());
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -52,7 +52,7 @@ pub trait SyncIo {
 | 
			
		||||
	fn protocol_version(&self, protocol: &ProtocolId, peer_id: PeerId) -> u8;
 | 
			
		||||
	/// Returns if the chain block queue empty
 | 
			
		||||
	fn is_chain_queue_empty(&self) -> bool {
 | 
			
		||||
		self.chain().queue_info().is_empty()
 | 
			
		||||
		self.chain().is_queue_empty()
 | 
			
		||||
	}
 | 
			
		||||
	/// Check if the session is expired
 | 
			
		||||
	fn is_expired(&self) -> bool;
 | 
			
		||||
 | 
			
		||||
@ -33,7 +33,7 @@ use ethcore::test_helpers;
 | 
			
		||||
use sync_io::SyncIo;
 | 
			
		||||
use io::{IoChannel, IoContext, IoHandler};
 | 
			
		||||
use api::WARP_SYNC_PROTOCOL_ID;
 | 
			
		||||
use chain::{ChainSync, ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_3, PRIVATE_TRANSACTION_PACKET, SIGNED_PRIVATE_TRANSACTION_PACKET};
 | 
			
		||||
use chain::{ChainSync, ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_3, PRIVATE_TRANSACTION_PACKET, SIGNED_PRIVATE_TRANSACTION_PACKET, SyncSupplier};
 | 
			
		||||
use SyncConfig;
 | 
			
		||||
use private_tx::SimplePrivateTxHandler;
 | 
			
		||||
 | 
			
		||||
@ -271,7 +271,7 @@ impl<C: FlushingBlockChainClient> Peer for EthPeer<C> {
 | 
			
		||||
 | 
			
		||||
	fn receive_message(&self, from: PeerId, msg: TestPacket) -> HashSet<PeerId> {
 | 
			
		||||
		let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, Some(from));
 | 
			
		||||
		ChainSync::dispatch_packet(&self.sync, &mut io, from, msg.packet_id, &msg.data);
 | 
			
		||||
		SyncSupplier::dispatch_packet(&self.sync, &mut io, from, msg.packet_id, &msg.data);
 | 
			
		||||
		self.chain.flush();
 | 
			
		||||
		io.to_disconnect.clone()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -15,6 +15,7 @@
 | 
			
		||||
// along with Parity.  If not, see <http://www.gnu.org/licenses/>.
 | 
			
		||||
 | 
			
		||||
use api::TransactionStats;
 | 
			
		||||
use std::hash::BuildHasher;
 | 
			
		||||
use std::collections::{HashSet, HashMap};
 | 
			
		||||
use ethereum_types::{H256, H512};
 | 
			
		||||
use fastmap::H256FastMap;
 | 
			
		||||
@ -74,7 +75,7 @@ impl TransactionsStats {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// Retains only transactions present in given `HashSet`.
 | 
			
		||||
	pub fn retain(&mut self, hashes: &HashSet<H256>) {
 | 
			
		||||
	pub fn retain<S: BuildHasher>(&mut self, hashes: &HashSet<H256, S>) {
 | 
			
		||||
		let to_remove = self.pending_transactions.keys()
 | 
			
		||||
			.filter(|hash| !hashes.contains(hash))
 | 
			
		||||
			.cloned()
 | 
			
		||||
 | 
			
		||||
@ -50,6 +50,10 @@ impl Notifier {
 | 
			
		||||
 | 
			
		||||
	/// Notify listeners about all currently pending transactions.
 | 
			
		||||
	pub fn notify(&mut self) {
 | 
			
		||||
		if self.pending.is_empty() {
 | 
			
		||||
			return;
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		for l in &self.listeners {
 | 
			
		||||
			(l)(&self.pending);
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
@ -14,7 +14,7 @@
 | 
			
		||||
// You should have received a copy of the GNU General Public License
 | 
			
		||||
// along with Parity.  If not, see <http://www.gnu.org/licenses/>.
 | 
			
		||||
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use std::sync::{Arc, mpsc};
 | 
			
		||||
 | 
			
		||||
use ethcore::client::BlockChainClient;
 | 
			
		||||
use sync::{self, AttachedProtocol, SyncConfig, NetworkConfiguration, Params, ConnectionFilter};
 | 
			
		||||
@ -25,12 +25,17 @@ pub use sync::{EthSync, SyncProvider, ManageNetwork, PrivateTxHandler};
 | 
			
		||||
pub use ethcore::client::ChainNotify;
 | 
			
		||||
use ethcore_logger::Config as LogConfig;
 | 
			
		||||
 | 
			
		||||
pub type SyncModules = (Arc<SyncProvider>, Arc<ManageNetwork>, Arc<ChainNotify>);
 | 
			
		||||
pub type SyncModules = (
 | 
			
		||||
	Arc<SyncProvider>,
 | 
			
		||||
	Arc<ManageNetwork>,
 | 
			
		||||
	Arc<ChainNotify>,
 | 
			
		||||
	mpsc::Sender<sync::PriorityTask>,
 | 
			
		||||
);
 | 
			
		||||
 | 
			
		||||
pub fn sync(
 | 
			
		||||
	sync_cfg: SyncConfig,
 | 
			
		||||
	net_cfg: NetworkConfiguration,
 | 
			
		||||
	client: Arc<BlockChainClient>,
 | 
			
		||||
	config: SyncConfig,
 | 
			
		||||
	network_config: NetworkConfiguration,
 | 
			
		||||
	chain: Arc<BlockChainClient>,
 | 
			
		||||
	snapshot_service: Arc<SnapshotService>,
 | 
			
		||||
	private_tx_handler: Arc<PrivateTxHandler>,
 | 
			
		||||
	provider: Arc<Provider>,
 | 
			
		||||
@ -39,15 +44,20 @@ pub fn sync(
 | 
			
		||||
	connection_filter: Option<Arc<ConnectionFilter>>,
 | 
			
		||||
) -> Result<SyncModules, sync::Error> {
 | 
			
		||||
	let eth_sync = EthSync::new(Params {
 | 
			
		||||
		config: sync_cfg,
 | 
			
		||||
		chain: client,
 | 
			
		||||
		provider: provider,
 | 
			
		||||
		snapshot_service: snapshot_service,
 | 
			
		||||
		config,
 | 
			
		||||
		chain,
 | 
			
		||||
		provider,
 | 
			
		||||
		snapshot_service,
 | 
			
		||||
		private_tx_handler,
 | 
			
		||||
		network_config: net_cfg,
 | 
			
		||||
		attached_protos: attached_protos,
 | 
			
		||||
		network_config,
 | 
			
		||||
		attached_protos,
 | 
			
		||||
	},
 | 
			
		||||
	connection_filter)?;
 | 
			
		||||
 | 
			
		||||
	Ok((eth_sync.clone() as Arc<SyncProvider>, eth_sync.clone() as Arc<ManageNetwork>, eth_sync.clone() as Arc<ChainNotify>))
 | 
			
		||||
	Ok((
 | 
			
		||||
		eth_sync.clone() as Arc<SyncProvider>,
 | 
			
		||||
		eth_sync.clone() as Arc<ManageNetwork>,
 | 
			
		||||
		eth_sync.clone() as Arc<ChainNotify>,
 | 
			
		||||
		eth_sync.priority_tasks()
 | 
			
		||||
	))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -15,7 +15,7 @@
 | 
			
		||||
// along with Parity.  If not, see <http://www.gnu.org/licenses/>.
 | 
			
		||||
 | 
			
		||||
use std::any::Any;
 | 
			
		||||
use std::sync::{Arc, Weak};
 | 
			
		||||
use std::sync::{Arc, Weak, atomic};
 | 
			
		||||
use std::time::{Duration, Instant};
 | 
			
		||||
use std::thread;
 | 
			
		||||
 | 
			
		||||
@ -480,7 +480,6 @@ fn execute_impl<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq:
 | 
			
		||||
		cmd.gas_pricer_conf.to_gas_pricer(fetch.clone(), runtime.executor()),
 | 
			
		||||
		&spec,
 | 
			
		||||
		Some(account_provider.clone()),
 | 
			
		||||
 | 
			
		||||
	));
 | 
			
		||||
	miner.set_author(cmd.miner_extras.author, None).expect("Fails only if password is Some; password is None; qed");
 | 
			
		||||
	miner.set_gas_range_target(cmd.miner_extras.gas_range_target);
 | 
			
		||||
@ -637,7 +636,7 @@ fn execute_impl<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq:
 | 
			
		||||
	};
 | 
			
		||||
 | 
			
		||||
	// create sync object
 | 
			
		||||
	let (sync_provider, manage_network, chain_notify) = modules::sync(
 | 
			
		||||
	let (sync_provider, manage_network, chain_notify, priority_tasks) = modules::sync(
 | 
			
		||||
		sync_config,
 | 
			
		||||
		net_conf.clone().into(),
 | 
			
		||||
		client.clone(),
 | 
			
		||||
@ -651,6 +650,18 @@ fn execute_impl<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq:
 | 
			
		||||
 | 
			
		||||
	service.add_notify(chain_notify.clone());
 | 
			
		||||
 | 
			
		||||
	// Propagate transactions as soon as they are imported.
 | 
			
		||||
	let tx = ::parking_lot::Mutex::new(priority_tasks);
 | 
			
		||||
	let is_ready = Arc::new(atomic::AtomicBool::new(true));
 | 
			
		||||
	miner.add_transactions_listener(Box::new(move |_hashes| {
 | 
			
		||||
		// we want to have only one PendingTransactions task in the queue.
 | 
			
		||||
		if is_ready.compare_and_swap(true, false, atomic::Ordering::SeqCst) {
 | 
			
		||||
			let task = ::sync::PriorityTask::PropagateTransactions(Instant::now(), is_ready.clone());
 | 
			
		||||
			// we ignore error cause it means that we are closing
 | 
			
		||||
			let _ = tx.lock().send(task);
 | 
			
		||||
		}
 | 
			
		||||
	}));
 | 
			
		||||
 | 
			
		||||
	// provider not added to a notification center is effectively disabled
 | 
			
		||||
	// TODO [debris] refactor it later on
 | 
			
		||||
	if cmd.private_tx_enabled {
 | 
			
		||||
@ -737,7 +748,7 @@ fn execute_impl<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq:
 | 
			
		||||
	let secretstore_deps = secretstore::Dependencies {
 | 
			
		||||
		client: client.clone(),
 | 
			
		||||
		sync: sync_provider.clone(),
 | 
			
		||||
		miner: miner,
 | 
			
		||||
		miner: miner.clone(),
 | 
			
		||||
		account_provider: account_provider,
 | 
			
		||||
		accounts_passwords: &passwords,
 | 
			
		||||
	};
 | 
			
		||||
 | 
			
		||||
@ -21,11 +21,13 @@ extern crate plain_hasher;
 | 
			
		||||
 | 
			
		||||
use ethereum_types::H256;
 | 
			
		||||
use std::hash;
 | 
			
		||||
use std::collections::HashMap;
 | 
			
		||||
use std::collections::{HashMap, HashSet};
 | 
			
		||||
use plain_hasher::PlainHasher;
 | 
			
		||||
 | 
			
		||||
/// Specialized version of `HashMap` with H256 keys and fast hashing function.
 | 
			
		||||
pub type H256FastMap<T> = HashMap<H256, T, hash::BuildHasherDefault<PlainHasher>>;
 | 
			
		||||
/// Specialized version of HashSet with H256 values and fast hashing function.
 | 
			
		||||
pub type H256FastSet = HashSet<H256, hash::BuildHasherDefault<PlainHasher>>;
 | 
			
		||||
 | 
			
		||||
#[cfg(test)]
 | 
			
		||||
mod tests {
 | 
			
		||||
@ -36,4 +38,4 @@ mod tests {
 | 
			
		||||
        let mut h = H256FastMap::default();
 | 
			
		||||
        h.insert(H256::from(123), "abc");
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user