From 0b5bbf60489417ee71e612c4669a8c83a8e1af4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Wed, 28 Nov 2018 10:30:05 +0000 Subject: [PATCH] Improve block and transaction propagation (#9954) * Refactor sync to add priority tasks. * Send priority tasks notifications. * Propagate blocks, optimize transactions. * Implement transaction propagation. Use sync_channel. * Tone down info. * Prevent deadlock by not waiting forever for sync lock. * Fix lock order. * Don't use sync_channel to prevent deadlocks. * Fix tests. --- ethcore/service/src/service.rs | 8 +- ethcore/src/client/chain_notify.rs | 12 +- ethcore/src/client/client.rs | 36 ++- ethcore/src/client/traits.rs | 5 + ethcore/src/miner/miner.rs | 2 +- ethcore/src/verification/queue/mod.rs | 7 + ethcore/sync/src/api.rs | 95 +++++-- ethcore/sync/src/chain/handler.rs | 16 +- ethcore/sync/src/chain/mod.rs | 361 ++++++++++++++++++------- ethcore/sync/src/chain/propagator.rs | 275 ++++++++++--------- ethcore/sync/src/chain/supplier.rs | 42 ++- ethcore/sync/src/sync_io.rs | 2 +- ethcore/sync/src/tests/helpers.rs | 4 +- ethcore/sync/src/transactions_stats.rs | 3 +- miner/src/pool/listener.rs | 4 + parity/modules.rs | 34 ++- parity/run.rs | 19 +- util/fastmap/src/lib.rs | 6 +- 18 files changed, 631 insertions(+), 300 deletions(-) diff --git a/ethcore/service/src/service.rs b/ethcore/service/src/service.rs index 1763b8fd5..77429a4e5 100644 --- a/ethcore/service/src/service.rs +++ b/ethcore/service/src/service.rs @@ -106,7 +106,13 @@ impl ClientService { info!("Configured for {} using {} engine", Colour::White.bold().paint(spec.name.clone()), Colour::Yellow.bold().paint(spec.engine.name())); let pruning = config.pruning; - let client = Client::new(config, &spec, blockchain_db.clone(), miner.clone(), io_service.channel())?; + let client = Client::new( + config, + &spec, + blockchain_db.clone(), + miner.clone(), + io_service.channel(), + )?; miner.set_io_channel(io_service.channel()); miner.set_in_chain_checker(&client.clone()); diff --git a/ethcore/src/client/chain_notify.rs b/ethcore/src/client/chain_notify.rs index ebfe7bdef..3d576ae12 100644 --- a/ethcore/src/client/chain_notify.rs +++ b/ethcore/src/client/chain_notify.rs @@ -15,7 +15,7 @@ // along with Parity. If not, see . use bytes::Bytes; -use ethereum_types::H256; +use ethereum_types::{H256, U256}; use transaction::UnverifiedTransaction; use blockchain::ImportRoute; use std::time::Duration; @@ -141,7 +141,15 @@ pub trait ChainNotify : Send + Sync { } /// fires when chain broadcasts a message - fn broadcast(&self, _message_type: ChainMessageType) {} + fn broadcast(&self, _message_type: ChainMessageType) { + // does nothing by default + } + + /// fires when new block is about to be imported + /// implementations should be light + fn block_pre_import(&self, _bytes: &Bytes, _hash: &H256, _difficulty: &U256) { + // does nothing by default + } /// fires when new transactions are received from a peer fn transactions_received(&self, diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index d5ae9d80f..00dc9cab5 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -881,7 +881,7 @@ impl Client { /// Flush the block import queue. pub fn flush_queue(&self) { self.importer.block_queue.flush(); - while !self.importer.block_queue.queue_info().is_empty() { + while !self.importer.block_queue.is_empty() { self.import_verified_blocks(); } } @@ -1423,8 +1423,21 @@ impl ImportBlock for Client { bail!(EthcoreErrorKind::Block(BlockError::UnknownParent(unverified.parent_hash()))); } + let raw = if self.importer.block_queue.is_empty() { + Some(( + unverified.bytes.clone(), + unverified.header.hash(), + *unverified.header.difficulty(), + )) + } else { None }; + match self.importer.block_queue.import(unverified) { - Ok(res) => Ok(res), + Ok(hash) => { + if let Some((raw, hash, difficulty)) = raw { + self.notify(move |n| n.block_pre_import(&raw, &hash, &difficulty)); + } + Ok(hash) + }, // we only care about block errors (not import errors) Err((block, EthcoreError(EthcoreErrorKind::Block(err), _))) => { self.importer.bad_blocks.report(block.bytes, format!("{:?}", err)); @@ -1878,6 +1891,10 @@ impl BlockChainClient for Client { self.importer.block_queue.queue_info() } + fn is_queue_empty(&self) -> bool { + self.importer.block_queue.is_empty() + } + fn clear_queue(&self) { self.importer.block_queue.clear(); } @@ -2288,7 +2305,11 @@ impl ScheduleInfo for Client { impl ImportSealedBlock for Client { fn import_sealed_block(&self, block: SealedBlock) -> EthcoreResult { let start = Instant::now(); + let raw = block.rlp_bytes(); let header = block.header().clone(); + let hash = header.hash(); + self.notify(|n| n.block_pre_import(&raw, &hash, header.difficulty())); + let route = { // Do a super duper basic verification to detect potential bugs if let Err(e) = self.engine.verify_block_basic(&header) { @@ -2306,15 +2327,14 @@ impl ImportSealedBlock for Client { let block_data = block.rlp_bytes(); let route = self.importer.commit_block(block, &header, encoded::Block::new(block_data), self); - trace!(target: "client", "Imported sealed block #{} ({})", header.number(), header.hash()); + trace!(target: "client", "Imported sealed block #{} ({})", header.number(), hash); self.state_db.write().sync_cache(&route.enacted, &route.retracted, false); route }; - let h = header.hash(); let route = ChainRoute::from([route].as_ref()); self.importer.miner.chain_new_blocks( self, - &[h], + &[hash], &[], route.enacted(), route.retracted(), @@ -2322,16 +2342,16 @@ impl ImportSealedBlock for Client { ); self.notify(|notify| { notify.new_blocks( - vec![h], + vec![hash], vec![], route.clone(), - vec![h], + vec![hash], vec![], start.elapsed(), ); }); self.db.read().key_value().flush().expect("DB flush failed."); - Ok(h) + Ok(hash) } } diff --git a/ethcore/src/client/traits.rs b/ethcore/src/client/traits.rs index 5b78a54b3..55d527013 100644 --- a/ethcore/src/client/traits.rs +++ b/ethcore/src/client/traits.rs @@ -300,6 +300,11 @@ pub trait BlockChainClient : Sync + Send + AccountData + BlockChain + CallContra /// Get block queue information. fn queue_info(&self) -> BlockQueueInfo; + /// Returns true if block queue is empty. + fn is_queue_empty(&self) -> bool { + self.queue_info().is_empty() + } + /// Clear block queue and abort all import activity. fn clear_queue(&self); diff --git a/ethcore/src/miner/miner.rs b/ethcore/src/miner/miner.rs index 39dac1f2b..c73887800 100644 --- a/ethcore/src/miner/miner.rs +++ b/ethcore/src/miner/miner.rs @@ -576,7 +576,7 @@ impl Miner { trace!(target: "miner", "requires_reseal: sealing enabled"); // Disable sealing if there were no requests for SEALING_TIMEOUT_IN_BLOCKS - let had_requests = sealing.last_request.map(|last_request| + let had_requests = sealing.last_request.map(|last_request| best_block.saturating_sub(last_request) <= SEALING_TIMEOUT_IN_BLOCKS ).unwrap_or(false); diff --git a/ethcore/src/verification/queue/mod.rs b/ethcore/src/verification/queue/mod.rs index 9b1597439..b9242f47b 100644 --- a/ethcore/src/verification/queue/mod.rs +++ b/ethcore/src/verification/queue/mod.rs @@ -583,6 +583,13 @@ impl VerificationQueue { result } + /// Returns true if there is nothing currently in the queue. + /// TODO [ToDr] Optimize to avoid locking + pub fn is_empty(&self) -> bool { + let v = &self.verification; + v.unverified.lock().is_empty() && v.verifying.lock().is_empty() && v.verified.lock().is_empty() + } + /// Get queue status. pub fn queue_info(&self) -> QueueInfo { use std::mem::size_of; diff --git a/ethcore/sync/src/api.rs b/ethcore/sync/src/api.rs index 4296a4c2d..6fef887a0 100644 --- a/ethcore/sync/src/api.rs +++ b/ethcore/sync/src/api.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use std::sync::Arc; +use std::sync::{Arc, mpsc, atomic}; use std::collections::{HashMap, BTreeMap}; use std::io; use std::ops::Range; @@ -33,10 +33,10 @@ use ethcore::client::{BlockChainClient, ChainNotify, ChainRoute, ChainMessageTyp use ethcore::snapshot::SnapshotService; use ethcore::header::BlockNumber; use sync_io::NetSyncIo; -use chain::{ChainSync, SyncStatus as EthSyncStatus}; +use chain::{ChainSyncApi, SyncStatus as EthSyncStatus}; use std::net::{SocketAddr, AddrParseError}; use std::str::FromStr; -use parking_lot::RwLock; +use parking_lot::{RwLock, Mutex}; use chain::{ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_62, PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3, PRIVATE_TRANSACTION_PACKET, SIGNED_PRIVATE_TRANSACTION_PACKET}; @@ -228,6 +228,37 @@ impl AttachedProtocol { } } +/// A prioritized tasks run in a specialised timer. +/// Every task should be completed within a hard deadline, +/// if it's not it's either cancelled or split into multiple tasks. +/// NOTE These tasks might not complete at all, so anything +/// that happens here should work even if the task is cancelled. +#[derive(Debug)] +pub enum PriorityTask { + /// Propagate given block + PropagateBlock { + /// When the task was initiated + started: ::std::time::Instant, + /// Raw block RLP to propagate + block: Bytes, + /// Block hash + hash: H256, + /// Blocks difficulty + difficulty: U256, + }, + /// Propagate a list of transactions + PropagateTransactions(::std::time::Instant, Arc), +} +impl PriorityTask { + /// Mark the task as being processed, right after it's retrieved from the queue. + pub fn starting(&self) { + match *self { + PriorityTask::PropagateTransactions(_, ref is_ready) => is_ready.store(true, atomic::Ordering::SeqCst), + _ => {}, + } + } +} + /// EthSync initialization parameters. pub struct Params { /// Configuration. @@ -260,6 +291,8 @@ pub struct EthSync { subprotocol_name: [u8; 3], /// Light subprotocol name. light_subprotocol_name: [u8; 3], + /// Priority tasks notification channel + priority_tasks: Mutex>, } fn light_params( @@ -312,13 +345,19 @@ impl EthSync { }) }; - let chain_sync = ChainSync::new(params.config, &*params.chain, params.private_tx_handler.clone()); + let (priority_tasks_tx, priority_tasks_rx) = mpsc::channel(); + let sync = ChainSyncApi::new( + params.config, + &*params.chain, + params.private_tx_handler.clone(), + priority_tasks_rx, + ); let service = NetworkService::new(params.network_config.clone().into_basic()?, connection_filter)?; let sync = Arc::new(EthSync { network: service, eth_handler: Arc::new(SyncProtocolHandler { - sync: RwLock::new(chain_sync), + sync, chain: params.chain, snapshot_service: params.snapshot_service, overlay: RwLock::new(HashMap::new()), @@ -327,26 +366,32 @@ impl EthSync { subprotocol_name: params.config.subprotocol_name, light_subprotocol_name: params.config.light_subprotocol_name, attached_protos: params.attached_protos, + priority_tasks: Mutex::new(priority_tasks_tx), }); Ok(sync) } + + /// Priority tasks producer + pub fn priority_tasks(&self) -> mpsc::Sender { + self.priority_tasks.lock().clone() + } } impl SyncProvider for EthSync { /// Get sync status fn status(&self) -> EthSyncStatus { - self.eth_handler.sync.read().status() + self.eth_handler.sync.status() } /// Get sync peers fn peers(&self) -> Vec { self.network.with_context_eval(self.subprotocol_name, |ctx| { let peer_ids = self.network.connected_peers(); - let eth_sync = self.eth_handler.sync.read(); let light_proto = self.light_proto.as_ref(); - peer_ids.into_iter().filter_map(|peer_id| { + let peer_info = self.eth_handler.sync.peer_info(&peer_ids); + peer_ids.into_iter().zip(peer_info).filter_map(|(peer_id, peer_info)| { let session_info = match ctx.session_info(peer_id) { None => return None, Some(info) => info, @@ -358,7 +403,7 @@ impl SyncProvider for EthSync { capabilities: session_info.peer_capabilities.into_iter().map(|c| c.to_string()).collect(), remote_address: session_info.remote_address, local_address: session_info.local_address, - eth_info: eth_sync.peer_info(&peer_id), + eth_info: peer_info, pip_info: light_proto.as_ref().and_then(|lp| lp.peer_status(peer_id)).map(Into::into), }) }).collect() @@ -370,17 +415,16 @@ impl SyncProvider for EthSync { } fn transactions_stats(&self) -> BTreeMap { - let sync = self.eth_handler.sync.read(); - sync.transactions_stats() - .iter() - .map(|(hash, stats)| (*hash, stats.into())) - .collect() + self.eth_handler.sync.transactions_stats() } } const PEERS_TIMER: TimerToken = 0; const SYNC_TIMER: TimerToken = 1; const TX_TIMER: TimerToken = 2; +const PRIORITY_TIMER: TimerToken = 3; + +pub(crate) const PRIORITY_TIMER_INTERVAL: Duration = Duration::from_millis(250); struct SyncProtocolHandler { /// Shared blockchain client. @@ -388,7 +432,7 @@ struct SyncProtocolHandler { /// Shared snapshot service. snapshot_service: Arc, /// Sync strategy - sync: RwLock, + sync: ChainSyncApi, /// Chain overlay used to cache data such as fork block. overlay: RwLock>, } @@ -399,11 +443,13 @@ impl NetworkProtocolHandler for SyncProtocolHandler { io.register_timer(PEERS_TIMER, Duration::from_millis(700)).expect("Error registering peers timer"); io.register_timer(SYNC_TIMER, Duration::from_millis(1100)).expect("Error registering sync timer"); io.register_timer(TX_TIMER, Duration::from_millis(1300)).expect("Error registering transactions timer"); + + io.register_timer(PRIORITY_TIMER, PRIORITY_TIMER_INTERVAL).expect("Error registering peers timer"); } } fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { - ChainSync::dispatch_packet(&self.sync, &mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer, packet_id, data); + self.sync.dispatch_packet(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer, packet_id, data); } fn connected(&self, io: &NetworkContext, peer: &PeerId) { @@ -429,15 +475,26 @@ impl NetworkProtocolHandler for SyncProtocolHandler { match timer { PEERS_TIMER => self.sync.write().maintain_peers(&mut io), SYNC_TIMER => self.sync.write().maintain_sync(&mut io), - TX_TIMER => { - self.sync.write().propagate_new_transactions(&mut io); - }, + TX_TIMER => self.sync.write().propagate_new_transactions(&mut io), + PRIORITY_TIMER => self.sync.process_priority_queue(&mut io), _ => warn!("Unknown timer {} triggered.", timer), } } } impl ChainNotify for EthSync { + fn block_pre_import(&self, bytes: &Bytes, hash: &H256, difficulty: &U256) { + let task = PriorityTask::PropagateBlock { + started: ::std::time::Instant::now(), + block: bytes.clone(), + hash: *hash, + difficulty: *difficulty, + }; + if let Err(e) = self.priority_tasks.lock().send(task) { + warn!(target: "sync", "Unexpected error during priority block propagation: {:?}", e); + } + } + fn new_blocks(&self, imported: Vec, invalid: Vec, diff --git a/ethcore/sync/src/chain/handler.rs b/ethcore/sync/src/chain/handler.rs index e67029743..104a80320 100644 --- a/ethcore/sync/src/chain/handler.rs +++ b/ethcore/sync/src/chain/handler.rs @@ -29,7 +29,6 @@ use rlp::Rlp; use snapshot::ChunkType; use std::cmp; use std::mem; -use std::collections::HashSet; use std::time::Instant; use sync_io::SyncIo; @@ -58,7 +57,6 @@ use super::{ SNAPSHOT_DATA_PACKET, SNAPSHOT_MANIFEST_PACKET, STATUS_PACKET, - TRANSACTIONS_PACKET, }; /// The Chain Sync Handler: handles responses from peers @@ -67,14 +65,9 @@ pub struct SyncHandler; impl SyncHandler { /// Handle incoming packet from peer pub fn on_packet(sync: &mut ChainSync, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) { - if packet_id != STATUS_PACKET && !sync.peers.contains_key(&peer) { - debug!(target:"sync", "Unexpected packet {} from unregistered peer: {}:{}", packet_id, peer, io.peer_info(peer)); - return; - } let rlp = Rlp::new(data); let result = match packet_id { STATUS_PACKET => SyncHandler::on_peer_status(sync, io, peer, &rlp), - TRANSACTIONS_PACKET => SyncHandler::on_peer_transactions(sync, io, peer, &rlp), BLOCK_HEADERS_PACKET => SyncHandler::on_peer_block_headers(sync, io, peer, &rlp), BLOCK_BODIES_PACKET => SyncHandler::on_peer_block_bodies(sync, io, peer, &rlp), RECEIPTS_PACKET => SyncHandler::on_peer_block_receipts(sync, io, peer, &rlp), @@ -109,10 +102,9 @@ impl SyncHandler { } /// Called when peer sends us new consensus packet - pub fn on_consensus_packet(io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> { + pub fn on_consensus_packet(io: &mut SyncIo, peer_id: PeerId, r: &Rlp) { trace!(target: "sync", "Received consensus packet from {:?}", peer_id); io.chain().queue_consensus_message(r.as_raw().to_vec()); - Ok(()) } /// Called by peer when it is disconnecting @@ -578,8 +570,8 @@ impl SyncHandler { asking_blocks: Vec::new(), asking_hash: None, ask_time: Instant::now(), - last_sent_transactions: HashSet::new(), - last_sent_private_transactions: HashSet::new(), + last_sent_transactions: Default::default(), + last_sent_private_transactions: Default::default(), expired: false, confirmation: if sync.fork_block.is_none() { ForkConfirmation::Confirmed } else { ForkConfirmation::Unconfirmed }, asking_snapshot_data: None, @@ -635,7 +627,7 @@ impl SyncHandler { } /// Called when peer sends us new transactions - fn on_peer_transactions(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> { + pub fn on_peer_transactions(sync: &ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> { // Accept transactions only when fully synced if !io.is_chain_queue_empty() || (sync.state != SyncState::Idle && sync.state != SyncState::NewBlocks) { trace!(target: "sync", "{} Ignoring transactions while syncing", peer_id); diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index e0fc8ecdd..cdedd5630 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -92,17 +92,17 @@ mod propagator; mod requester; mod supplier; -use std::sync::Arc; -use std::collections::{HashSet, HashMap}; +use std::sync::{Arc, mpsc}; +use std::collections::{HashSet, HashMap, BTreeMap}; use std::cmp; use std::time::{Duration, Instant}; use hash::keccak; use heapsize::HeapSizeOf; use ethereum_types::{H256, U256}; -use fastmap::H256FastMap; -use parking_lot::RwLock; +use fastmap::{H256FastMap, H256FastSet}; +use parking_lot::{Mutex, RwLock, RwLockWriteGuard}; use bytes::Bytes; -use rlp::{Rlp, RlpStream, DecoderError}; +use rlp::{RlpStream, DecoderError}; use network::{self, PeerId, PacketId}; use ethcore::header::{BlockNumber}; use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo, BlockQueueInfo}; @@ -112,7 +112,7 @@ use super::{WarpSync, SyncConfig}; use block_sync::{BlockDownloader, DownloadAction}; use rand::Rng; use snapshot::{Snapshot}; -use api::{EthProtocolInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID}; +use api::{EthProtocolInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID, PriorityTask}; use private_tx::PrivateTxHandler; use transactions_stats::{TransactionsStats, Stats as TransactionStats}; use transaction::UnverifiedTransaction; @@ -120,7 +120,7 @@ use transaction::UnverifiedTransaction; use self::handler::SyncHandler; use self::propagator::SyncPropagator; use self::requester::SyncRequester; -use self::supplier::SyncSupplier; +pub(crate) use self::supplier::SyncSupplier; known_heap_size!(0, PeerInfo); @@ -187,6 +187,11 @@ const FORK_HEADER_TIMEOUT: Duration = Duration::from_secs(3); const SNAPSHOT_MANIFEST_TIMEOUT: Duration = Duration::from_secs(5); const SNAPSHOT_DATA_TIMEOUT: Duration = Duration::from_secs(120); +/// Defines how much time we have to complete priority transaction or block propagation. +/// after the deadline is reached the task is considered finished +/// (so we might sent only to some part of the peers we originally intended to send to) +const PRIORITY_TASK_DEADLINE: Duration = Duration::from_millis(100); + #[derive(Copy, Clone, Eq, PartialEq, Debug)] /// Sync state pub enum SyncState { @@ -323,9 +328,9 @@ pub struct PeerInfo { /// Request timestamp ask_time: Instant, /// Holds a set of transactions recently sent to this peer to avoid spamming. - last_sent_transactions: HashSet, + last_sent_transactions: H256FastSet, /// Holds a set of private transactions and their signatures recently sent to this peer to avoid spamming. - last_sent_private_transactions: HashSet, + last_sent_private_transactions: H256FastSet, /// Pending request is expired and result should be ignored expired: bool, /// Peer fork confirmation status @@ -375,6 +380,217 @@ pub mod random { pub type RlpResponseResult = Result, PacketDecodeError>; pub type Peers = HashMap; +/// Thread-safe wrapper for `ChainSync`. +/// +/// NOTE always lock in order of fields declaration +pub struct ChainSyncApi { + /// Priority tasks queue + priority_tasks: Mutex>, + /// The rest of sync data + sync: RwLock, +} + +impl ChainSyncApi { + /// Creates new `ChainSyncApi` + pub fn new( + config: SyncConfig, + chain: &BlockChainClient, + private_tx_handler: Arc, + priority_tasks: mpsc::Receiver, + ) -> Self { + ChainSyncApi { + sync: RwLock::new(ChainSync::new(config, chain, private_tx_handler)), + priority_tasks: Mutex::new(priority_tasks), + } + } + + /// Gives `write` access to underlying `ChainSync` + pub fn write(&self) -> RwLockWriteGuard { + self.sync.write() + } + + /// Returns info about given list of peers + pub fn peer_info(&self, ids: &[PeerId]) -> Vec> { + let sync = self.sync.read(); + ids.iter().map(|id| sync.peer_info(id)).collect() + } + + /// Returns synchonization status + pub fn status(&self) -> SyncStatus { + self.sync.read().status() + } + + /// Returns transactions propagation statistics + pub fn transactions_stats(&self) -> BTreeMap { + self.sync.read().transactions_stats() + .iter() + .map(|(hash, stats)| (*hash, stats.into())) + .collect() + } + + /// Dispatch incoming requests and responses + pub fn dispatch_packet(&self, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) { + SyncSupplier::dispatch_packet(&self.sync, io, peer, packet_id, data) + } + + /// Process a priority propagation queue. + /// This task is run from a timer and should be time constrained. + /// Hence we set up a deadline for the execution and cancel the task if the deadline is exceeded. + /// + /// NOTE This method should only handle stuff that can be canceled and would reach other peers + /// by other means. + pub fn process_priority_queue(&self, io: &mut SyncIo) { + fn check_deadline(deadline: Instant) -> Option { + let now = Instant::now(); + if now > deadline { + None + } else { + Some(deadline - now) + } + } + + // deadline to get the task from the queue + let deadline = Instant::now() + ::api::PRIORITY_TIMER_INTERVAL; + let mut work = || { + let task = { + let tasks = self.priority_tasks.try_lock_until(deadline)?; + let left = check_deadline(deadline)?; + tasks.recv_timeout(left).ok()? + }; + task.starting(); + // wait for the sync lock until deadline, + // note we might drop the task here if we won't manage to acquire the lock. + let mut sync = self.sync.try_write_until(deadline)?; + // since we already have everything let's use a different deadline + // to do the rest of the job now, so that previous work is not wasted. + let deadline = Instant::now() + PRIORITY_TASK_DEADLINE; + let as_ms = move |prev| { + let dur: Duration = Instant::now() - prev; + dur.as_secs() * 1_000 + dur.subsec_millis() as u64 + }; + match task { + // NOTE We can't simply use existing methods, + // cause the block is not in the DB yet. + PriorityTask::PropagateBlock { started, block, hash, difficulty } => { + // try to send to peers that are on the same block as us + // (they will most likely accept the new block). + let chain_info = io.chain().chain_info(); + let total_difficulty = chain_info.total_difficulty + difficulty; + let rlp = ChainSync::create_block_rlp(&block, total_difficulty); + for peers in sync.get_peers(&chain_info, PeerState::SameBlock).chunks(10) { + check_deadline(deadline)?; + for peer in peers { + SyncPropagator::send_packet(io, *peer, NEW_BLOCK_PACKET, rlp.clone()); + if let Some(ref mut peer) = sync.peers.get_mut(peer) { + peer.latest_hash = hash; + } + } + } + debug!(target: "sync", "Finished block propagation, took {}ms", as_ms(started)); + }, + PriorityTask::PropagateTransactions(time, _) => { + SyncPropagator::propagate_new_transactions(&mut sync, io, || { + check_deadline(deadline).is_some() + }); + debug!(target: "sync", "Finished transaction propagation, took {}ms", as_ms(time)); + }, + } + + Some(()) + }; + + // Process as many items as we can until the deadline is reached. + loop { + if work().is_none() { + return; + } + } + } +} + +// Static methods +impl ChainSync { + /// creates rlp to send for the tree defined by 'from' and 'to' hashes + fn create_new_hashes_rlp(chain: &BlockChainClient, from: &H256, to: &H256) -> Option { + match chain.tree_route(from, to) { + Some(route) => { + let uncles = chain.find_uncles(from).unwrap_or_else(Vec::new); + match route.blocks.len() { + 0 => None, + _ => { + let mut blocks = route.blocks; + blocks.extend(uncles); + let mut rlp_stream = RlpStream::new_list(blocks.len()); + for block_hash in blocks { + let mut hash_rlp = RlpStream::new_list(2); + let number = chain.block_header(BlockId::Hash(block_hash.clone())) + .expect("chain.tree_route and chain.find_uncles only return hahses of blocks that are in the blockchain. qed.").number(); + hash_rlp.append(&block_hash); + hash_rlp.append(&number); + rlp_stream.append_raw(hash_rlp.as_raw(), 1); + } + Some(rlp_stream.out()) + } + } + }, + None => None + } + } + + /// creates rlp from block bytes and total difficulty + fn create_block_rlp(bytes: &Bytes, total_difficulty: U256) -> Bytes { + let mut rlp_stream = RlpStream::new_list(2); + rlp_stream.append_raw(bytes, 1); + rlp_stream.append(&total_difficulty); + rlp_stream.out() + } + + /// creates latest block rlp for the given client + fn create_latest_block_rlp(chain: &BlockChainClient) -> Bytes { + Self::create_block_rlp( + &chain.block(BlockId::Hash(chain.chain_info().best_block_hash)) + .expect("Best block always exists").into_inner(), + chain.chain_info().total_difficulty + ) + } + + /// creates given hash block rlp for the given client + fn create_new_block_rlp(chain: &BlockChainClient, hash: &H256) -> Bytes { + Self::create_block_rlp( + &chain.block(BlockId::Hash(hash.clone())).expect("Block has just been sealed; qed").into_inner(), + chain.block_total_difficulty(BlockId::Hash(hash.clone())).expect("Block has just been sealed; qed.") + ) + } + + fn select_random_peers(peers: &[PeerId]) -> Vec { + // take sqrt(x) peers + let mut peers = peers.to_vec(); + let mut count = (peers.len() as f64).powf(0.5).round() as usize; + count = cmp::min(count, MAX_PEERS_PROPAGATION); + count = cmp::max(count, MIN_PEERS_PROPAGATION); + random::new().shuffle(&mut peers); + peers.truncate(count); + peers + } + + fn get_init_state(warp_sync: WarpSync, chain: &BlockChainClient) -> SyncState { + let best_block = chain.chain_info().best_block_number; + match warp_sync { + WarpSync::Enabled => SyncState::WaitingPeers, + WarpSync::OnlyAndAfter(block) if block > best_block => SyncState::WaitingPeers, + _ => SyncState::Idle, + } + } +} + +/// A peer query method for getting a list of peers +enum PeerState { + /// Peer is on different hash than us + Lagging, + /// Peer is on the same block as us + SameBlock +} + /// Blockchain sync handler. /// See module documentation for more details. pub struct ChainSync { @@ -417,10 +633,14 @@ pub struct ChainSync { impl ChainSync { /// Create a new instance of syncing strategy. - pub fn new(config: SyncConfig, chain: &BlockChainClient, private_tx_handler: Arc) -> ChainSync { + pub fn new( + config: SyncConfig, + chain: &BlockChainClient, + private_tx_handler: Arc, + ) -> Self { let chain_info = chain.chain_info(); let best_block = chain.chain_info().best_block_number; - let state = ChainSync::get_init_state(config.warp_sync, chain); + let state = Self::get_init_state(config.warp_sync, chain); let mut sync = ChainSync { state, @@ -445,15 +665,6 @@ impl ChainSync { sync } - fn get_init_state(warp_sync: WarpSync, chain: &BlockChainClient) -> SyncState { - let best_block = chain.chain_info().best_block_number; - match warp_sync { - WarpSync::Enabled => SyncState::WaitingPeers, - WarpSync::OnlyAndAfter(block) if block > best_block => SyncState::WaitingPeers, - _ => SyncState::Idle, - } - } - /// Returns synchonization status pub fn status(&self) -> SyncStatus { let last_imported_number = self.new_blocks.last_imported_block_number(); @@ -521,7 +732,7 @@ impl ChainSync { } } } - self.state = state.unwrap_or_else(|| ChainSync::get_init_state(self.warp_sync, io.chain())); + self.state = state.unwrap_or_else(|| Self::get_init_state(self.warp_sync, io.chain())); // Reactivate peers only if some progress has been made // since the last sync round of if starting fresh. self.active_peers = self.peers.keys().cloned().collect(); @@ -1004,67 +1215,24 @@ impl ChainSync { } } - /// creates rlp to send for the tree defined by 'from' and 'to' hashes - fn create_new_hashes_rlp(chain: &BlockChainClient, from: &H256, to: &H256) -> Option { - match chain.tree_route(from, to) { - Some(route) => { - let uncles = chain.find_uncles(from).unwrap_or_else(Vec::new); - match route.blocks.len() { - 0 => None, - _ => { - let mut blocks = route.blocks; - blocks.extend(uncles); - let mut rlp_stream = RlpStream::new_list(blocks.len()); - for block_hash in blocks { - let mut hash_rlp = RlpStream::new_list(2); - let number = chain.block_header(BlockId::Hash(block_hash.clone())) - .expect("chain.tree_route and chain.find_uncles only return hahses of blocks that are in the blockchain. qed.").number(); - hash_rlp.append(&block_hash); - hash_rlp.append(&number); - rlp_stream.append_raw(hash_rlp.as_raw(), 1); - } - Some(rlp_stream.out()) - } - } - }, - None => None - } + /// returns peer ids that have different block than our chain + fn get_lagging_peers(&self, chain_info: &BlockChainInfo) -> Vec { + self.get_peers(chain_info, PeerState::Lagging) } - /// creates rlp from block bytes and total difficulty - fn create_block_rlp(bytes: &Bytes, total_difficulty: U256) -> Bytes { - let mut rlp_stream = RlpStream::new_list(2); - rlp_stream.append_raw(bytes, 1); - rlp_stream.append(&total_difficulty); - rlp_stream.out() - } - - /// creates latest block rlp for the given client - fn create_latest_block_rlp(chain: &BlockChainClient) -> Bytes { - ChainSync::create_block_rlp( - &chain.block(BlockId::Hash(chain.chain_info().best_block_hash)) - .expect("Best block always exists").into_inner(), - chain.chain_info().total_difficulty - ) - } - - /// creates given hash block rlp for the given client - fn create_new_block_rlp(chain: &BlockChainClient, hash: &H256) -> Bytes { - ChainSync::create_block_rlp( - &chain.block(BlockId::Hash(hash.clone())).expect("Block has just been sealed; qed").into_inner(), - chain.block_total_difficulty(BlockId::Hash(hash.clone())).expect("Block has just been sealed; qed.") - ) - } - - /// returns peer ids that have different blocks than our chain - fn get_lagging_peers(&mut self, chain_info: &BlockChainInfo) -> Vec { + /// returns peer ids that have different or the same blocks than our chain + fn get_peers(&self, chain_info: &BlockChainInfo, peers: PeerState) -> Vec { let latest_hash = chain_info.best_block_hash; self .peers - .iter_mut() + .iter() .filter_map(|(&id, ref mut peer_info)| { trace!(target: "sync", "Checking peer our best {} their best {}", latest_hash, peer_info.latest_hash); - if peer_info.latest_hash != latest_hash { + let matches = match peers { + PeerState::Lagging => peer_info.latest_hash != latest_hash, + PeerState::SameBlock => peer_info.latest_hash == latest_hash, + }; + if matches { Some(id) } else { None @@ -1073,17 +1241,6 @@ impl ChainSync { .collect::>() } - fn select_random_peers(peers: &[PeerId]) -> Vec { - // take sqrt(x) peers - let mut peers = peers.to_vec(); - let mut count = (peers.len() as f64).powf(0.5).round() as usize; - count = cmp::min(count, MAX_PEERS_PROPAGATION); - count = cmp::max(count, MIN_PEERS_PROPAGATION); - random::new().shuffle(&mut peers); - peers.truncate(count); - peers - } - fn get_consensus_peers(&self) -> Vec { self.peers.iter().filter_map(|(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_2.0 { Some(*id) } else { None }).collect() } @@ -1132,21 +1289,10 @@ impl ChainSync { } } - /// Dispatch incoming requests and responses - pub fn dispatch_packet(sync: &RwLock, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) { - SyncSupplier::dispatch_packet(sync, io, peer, packet_id, data) - } - pub fn on_packet(&mut self, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) { - debug!(target: "sync", "{} -> Dispatching packet: {}", peer, packet_id); SyncHandler::on_packet(self, io, peer, packet_id, data); } - /// Called when peer sends us new consensus packet - pub fn on_consensus_packet(io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> { - SyncHandler::on_consensus_packet(io, peer_id, r) - } - /// Called by peer when it is disconnecting pub fn on_peer_aborting(&mut self, io: &mut SyncIo, peer: PeerId) { SyncHandler::on_peer_aborting(self, io, peer); @@ -1158,8 +1304,16 @@ impl ChainSync { } /// propagates new transactions to all peers - pub fn propagate_new_transactions(&mut self, io: &mut SyncIo) -> usize { - SyncPropagator::propagate_new_transactions(self, io) + pub fn propagate_new_transactions(&mut self, io: &mut SyncIo) { + let deadline = Instant::now() + Duration::from_millis(500); + SyncPropagator::propagate_new_transactions(self, io, || { + if deadline > Instant::now() { + true + } else { + debug!(target: "sync", "Wasn't able to finish transaction propagation within a deadline."); + false + } + }); } /// Broadcast consensus message to peers. @@ -1175,7 +1329,7 @@ impl ChainSync { #[cfg(test)] pub mod tests { - use std::collections::{HashSet, VecDeque}; + use std::collections::{VecDeque}; use ethkey; use network::PeerId; use tests::helpers::{TestIo}; @@ -1291,8 +1445,8 @@ pub mod tests { asking_blocks: Vec::new(), asking_hash: None, ask_time: Instant::now(), - last_sent_transactions: HashSet::new(), - last_sent_private_transactions: HashSet::new(), + last_sent_transactions: Default::default(), + last_sent_private_transactions: Default::default(), expired: false, confirmation: super::ForkConfirmation::Confirmed, snapshot_number: None, @@ -1307,7 +1461,7 @@ pub mod tests { fn finds_lagging_peers() { let mut client = TestBlockChainClient::new(); client.add_blocks(100, EachBlockWith::Uncle); - let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(10), &client); + let sync = dummy_sync_with_peer(client.block_hash_delta_minus(10), &client); let chain_info = client.chain_info(); let lagging_peers = sync.get_lagging_peers(&chain_info); @@ -1447,3 +1601,4 @@ pub mod tests { assert_eq!(status.status.transaction_count, 0); } } + diff --git a/ethcore/sync/src/chain/propagator.rs b/ethcore/sync/src/chain/propagator.rs index 82b592c4b..689ccfc02 100644 --- a/ethcore/sync/src/chain/propagator.rs +++ b/ethcore/sync/src/chain/propagator.rs @@ -18,6 +18,7 @@ use bytes::Bytes; use ethereum_types::H256; use ethcore::client::BlockChainInfo; use ethcore::header::BlockNumber; +use fastmap::H256FastSet; use network::{PeerId, PacketId}; use rand::Rng; use rlp::{Encodable, RlpStream}; @@ -69,49 +70,51 @@ impl SyncPropagator { /// propagates latest block to a set of peers pub fn propagate_blocks(sync: &mut ChainSync, chain_info: &BlockChainInfo, io: &mut SyncIo, blocks: &[H256], peers: &[PeerId]) -> usize { trace!(target: "sync", "Sending NewBlocks to {:?}", peers); - let mut sent = 0; - for peer_id in peers { - if blocks.is_empty() { - let rlp = ChainSync::create_latest_block_rlp(io.chain()); - SyncPropagator::send_packet(io, *peer_id, NEW_BLOCK_PACKET, rlp); - } else { - for h in blocks { - let rlp = ChainSync::create_new_block_rlp(io.chain(), h); - SyncPropagator::send_packet(io, *peer_id, NEW_BLOCK_PACKET, rlp); + let sent = peers.len(); + let mut send_packet = |io: &mut SyncIo, rlp: Bytes| { + for peer_id in peers { + SyncPropagator::send_packet(io, *peer_id, NEW_BLOCK_PACKET, rlp.clone()); + if let Some(ref mut peer) = sync.peers.get_mut(peer_id) { + peer.latest_hash = chain_info.best_block_hash.clone(); } } - if let Some(ref mut peer) = sync.peers.get_mut(peer_id) { - peer.latest_hash = chain_info.best_block_hash.clone(); + }; + + if blocks.is_empty() { + let rlp = ChainSync::create_latest_block_rlp(io.chain()); + send_packet(io, rlp); + } else { + for h in blocks { + let rlp = ChainSync::create_new_block_rlp(io.chain(), h); + send_packet(io, rlp); } - sent += 1; } + sent } /// propagates new known hashes to all peers pub fn propagate_new_hashes(sync: &mut ChainSync, chain_info: &BlockChainInfo, io: &mut SyncIo, peers: &[PeerId]) -> usize { trace!(target: "sync", "Sending NewHashes to {:?}", peers); - let mut sent = 0; let last_parent = *io.chain().best_block_header().parent_hash(); + let best_block_hash = chain_info.best_block_hash; + let rlp = match ChainSync::create_new_hashes_rlp(io.chain(), &last_parent, &best_block_hash) { + Some(rlp) => rlp, + None => return 0 + }; + + let sent = peers.len(); for peer_id in peers { - sent += match ChainSync::create_new_hashes_rlp(io.chain(), &last_parent, &chain_info.best_block_hash) { - Some(rlp) => { - { - if let Some(ref mut peer) = sync.peers.get_mut(peer_id) { - peer.latest_hash = chain_info.best_block_hash.clone(); - } - } - SyncPropagator::send_packet(io, *peer_id, NEW_BLOCK_HASHES_PACKET, rlp); - 1 - }, - None => 0 + if let Some(ref mut peer) = sync.peers.get_mut(peer_id) { + peer.latest_hash = best_block_hash; } + SyncPropagator::send_packet(io, *peer_id, NEW_BLOCK_HASHES_PACKET, rlp.clone()); } sent } /// propagates new transactions to all peers - pub fn propagate_new_transactions(sync: &mut ChainSync, io: &mut SyncIo) -> usize { + pub fn propagate_new_transactions bool>(sync: &mut ChainSync, io: &mut SyncIo, mut should_continue: F) -> usize { // Early out if nobody to send to. if sync.peers.is_empty() { return 0; @@ -122,6 +125,10 @@ impl SyncPropagator { return 0; } + if !should_continue() { + return 0; + } + let (transactions, service_transactions): (Vec<_>, Vec<_>) = transactions.iter() .map(|tx| tx.signed()) .partition(|tx| !tx.gas_price.is_zero()); @@ -130,24 +137,34 @@ impl SyncPropagator { let mut affected_peers = HashSet::new(); if !transactions.is_empty() { let peers = SyncPropagator::select_peers_for_transactions(sync, |_| true); - affected_peers = SyncPropagator::propagate_transactions_to_peers(sync, io, peers, transactions); + affected_peers = SyncPropagator::propagate_transactions_to_peers( + sync, io, peers, transactions, &mut should_continue, + ); } // most of times service_transactions will be empty // => there's no need to merge packets if !service_transactions.is_empty() { let service_transactions_peers = SyncPropagator::select_peers_for_transactions(sync, |peer_id| accepts_service_transaction(&io.peer_info(*peer_id))); - let service_transactions_affected_peers = SyncPropagator::propagate_transactions_to_peers(sync, io, service_transactions_peers, service_transactions); + let service_transactions_affected_peers = SyncPropagator::propagate_transactions_to_peers( + sync, io, service_transactions_peers, service_transactions, &mut should_continue + ); affected_peers.extend(&service_transactions_affected_peers); } affected_peers.len() } - fn propagate_transactions_to_peers(sync: &mut ChainSync, io: &mut SyncIo, peers: Vec, transactions: Vec<&SignedTransaction>) -> HashSet { + fn propagate_transactions_to_peers bool>( + sync: &mut ChainSync, + io: &mut SyncIo, + peers: Vec, + transactions: Vec<&SignedTransaction>, + mut should_continue: F, + ) -> HashSet { let all_transactions_hashes = transactions.iter() .map(|tx| tx.hash()) - .collect::>(); + .collect::(); let all_transactions_rlp = { let mut packet = RlpStream::new_list(transactions.len()); for tx in &transactions { packet.append(&**tx); } @@ -157,102 +174,104 @@ impl SyncPropagator { // Clear old transactions from stats sync.transactions_stats.retain(&all_transactions_hashes); - // sqrt(x)/x scaled to max u32 - let block_number = io.chain().chain_info().best_block_number; - - let lucky_peers = { - peers.into_iter() - .filter_map(|peer_id| { - let stats = &mut sync.transactions_stats; - let peer_info = sync.peers.get_mut(&peer_id) - .expect("peer_id is form peers; peers is result of select_peers_for_transactions; select_peers_for_transactions selects peers from self.peers; qed"); - - // Send all transactions - if peer_info.last_sent_transactions.is_empty() { - // update stats - for hash in &all_transactions_hashes { - let id = io.peer_session_info(peer_id).and_then(|info| info.id); - stats.propagated(hash, id, block_number); - } - peer_info.last_sent_transactions = all_transactions_hashes.clone(); - return Some((peer_id, all_transactions_hashes.len(), all_transactions_rlp.clone())); - } - - // Get hashes of all transactions to send to this peer - let to_send = all_transactions_hashes.difference(&peer_info.last_sent_transactions) - .cloned() - .collect::>(); - if to_send.is_empty() { - return None; - } - - // Construct RLP - let (packet, to_send) = { - let mut to_send = to_send; - let mut packet = RlpStream::new(); - packet.begin_unbounded_list(); - let mut pushed = 0; - for tx in &transactions { - let hash = tx.hash(); - if to_send.contains(&hash) { - let mut transaction = RlpStream::new(); - tx.rlp_append(&mut transaction); - let appended = packet.append_raw_checked(&transaction.drain(), 1, MAX_TRANSACTION_PACKET_SIZE); - if !appended { - // Maximal packet size reached just proceed with sending - debug!(target: "sync", "Transaction packet size limit reached. Sending incomplete set of {}/{} transactions.", pushed, to_send.len()); - to_send = to_send.into_iter().take(pushed).collect(); - break; - } - pushed += 1; - } - } - packet.complete_unbounded_list(); - (packet, to_send) - }; - - // Update stats - let id = io.peer_session_info(peer_id).and_then(|info| info.id); - for hash in &to_send { - // update stats - stats.propagated(hash, id, block_number); - } - - peer_info.last_sent_transactions = all_transactions_hashes - .intersection(&peer_info.last_sent_transactions) - .chain(&to_send) - .cloned() - .collect(); - Some((peer_id, to_send.len(), packet.out())) - }) - .collect::>() + let send_packet = |io: &mut SyncIo, peer_id: PeerId, sent: usize, rlp: Bytes| { + let size = rlp.len(); + SyncPropagator::send_packet(io, peer_id, TRANSACTIONS_PACKET, rlp); + trace!(target: "sync", "{:02} <- Transactions ({} entries; {} bytes)", peer_id, sent, size); }; - // Send RLPs - let mut peers = HashSet::new(); - if lucky_peers.len() > 0 { - let mut max_sent = 0; - let lucky_peers_len = lucky_peers.len(); - for (peer_id, sent, rlp) in lucky_peers { - peers.insert(peer_id); - let size = rlp.len(); - SyncPropagator::send_packet(io, peer_id, TRANSACTIONS_PACKET, rlp); - trace!(target: "sync", "{:02} <- Transactions ({} entries; {} bytes)", peer_id, sent, size); - max_sent = cmp::max(max_sent, sent); + let block_number = io.chain().chain_info().best_block_number; + let mut sent_to_peers = HashSet::new(); + let mut max_sent = 0; + + // for every peer construct and send transactions packet + for peer_id in peers { + if !should_continue() { + debug!(target: "sync", "Sent up to {} transactions to {} peers.", max_sent, sent_to_peers.len()); + return sent_to_peers; } - debug!(target: "sync", "Sent up to {} transactions to {} peers.", max_sent, lucky_peers_len); + + let stats = &mut sync.transactions_stats; + let peer_info = sync.peers.get_mut(&peer_id) + .expect("peer_id is form peers; peers is result of select_peers_for_transactions; select_peers_for_transactions selects peers from self.peers; qed"); + + // Send all transactions, if the peer doesn't know about anything + if peer_info.last_sent_transactions.is_empty() { + // update stats + for hash in &all_transactions_hashes { + let id = io.peer_session_info(peer_id).and_then(|info| info.id); + stats.propagated(hash, id, block_number); + } + peer_info.last_sent_transactions = all_transactions_hashes.clone(); + + send_packet(io, peer_id, all_transactions_hashes.len(), all_transactions_rlp.clone()); + sent_to_peers.insert(peer_id); + max_sent = cmp::max(max_sent, all_transactions_hashes.len()); + continue; + } + + // Get hashes of all transactions to send to this peer + let to_send = all_transactions_hashes.difference(&peer_info.last_sent_transactions) + .cloned() + .collect::>(); + if to_send.is_empty() { + continue; + } + + // Construct RLP + let (packet, to_send) = { + let mut to_send = to_send; + let mut packet = RlpStream::new(); + packet.begin_unbounded_list(); + let mut pushed = 0; + for tx in &transactions { + let hash = tx.hash(); + if to_send.contains(&hash) { + let mut transaction = RlpStream::new(); + tx.rlp_append(&mut transaction); + let appended = packet.append_raw_checked(&transaction.drain(), 1, MAX_TRANSACTION_PACKET_SIZE); + if !appended { + // Maximal packet size reached just proceed with sending + debug!(target: "sync", "Transaction packet size limit reached. Sending incomplete set of {}/{} transactions.", pushed, to_send.len()); + to_send = to_send.into_iter().take(pushed).collect(); + break; + } + pushed += 1; + } + } + packet.complete_unbounded_list(); + (packet, to_send) + }; + + // Update stats + let id = io.peer_session_info(peer_id).and_then(|info| info.id); + for hash in &to_send { + // update stats + stats.propagated(hash, id, block_number); + } + + peer_info.last_sent_transactions = all_transactions_hashes + .intersection(&peer_info.last_sent_transactions) + .chain(&to_send) + .cloned() + .collect(); + send_packet(io, peer_id, to_send.len(), packet.out()); + sent_to_peers.insert(peer_id); + max_sent = cmp::max(max_sent, to_send.len()); + } - peers + debug!(target: "sync", "Sent up to {} transactions to {} peers.", max_sent, sent_to_peers.len()); + sent_to_peers } pub fn propagate_latest_blocks(sync: &mut ChainSync, io: &mut SyncIo, sealed: &[H256]) { let chain_info = io.chain().chain_info(); if (((chain_info.best_block_number as i64) - (sync.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION { - let mut peers = sync.get_lagging_peers(&chain_info); + let peers = sync.get_lagging_peers(&chain_info); if sealed.is_empty() { let hashes = SyncPropagator::propagate_new_hashes(sync, &chain_info, io, &peers); - peers = ChainSync::select_random_peers(&peers); + let peers = ChainSync::select_random_peers(&peers); let blocks = SyncPropagator::propagate_blocks(sync, &chain_info, io, sealed, &peers); if blocks != 0 || hashes != 0 { trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes); @@ -318,7 +337,7 @@ impl SyncPropagator { } /// Generic packet sender - fn send_packet(sync: &mut SyncIo, peer_id: PeerId, packet_id: PacketId, packet: Bytes) { + pub fn send_packet(sync: &mut SyncIo, peer_id: PeerId, packet_id: PacketId, packet: Bytes) { if let Err(e) = sync.send(peer_id, packet_id, packet) { debug!(target:"sync", "Error sending packet: {:?}", e); sync.disconnect_peer(peer_id); @@ -419,8 +438,8 @@ mod tests { asking_blocks: Vec::new(), asking_hash: None, ask_time: Instant::now(), - last_sent_transactions: HashSet::new(), - last_sent_private_transactions: HashSet::new(), + last_sent_transactions: Default::default(), + last_sent_private_transactions: Default::default(), expired: false, confirmation: ForkConfirmation::Confirmed, snapshot_number: None, @@ -447,13 +466,13 @@ mod tests { let queue = RwLock::new(VecDeque::new()); let ss = TestSnapshotService::new(); let mut io = TestIo::new(&mut client, &ss, &queue, None); - let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io); + let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); // Try to propagate same transactions for the second time - let peer_count2 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io); + let peer_count2 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); // Even after new block transactions should not be propagated twice sync.chain_new_blocks(&mut io, &[], &[], &[], &[], &[], &[]); // Try to propagate same transactions for the third time - let peer_count3 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io); + let peer_count3 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); // 1 message should be send assert_eq!(1, io.packets.len()); @@ -474,7 +493,7 @@ mod tests { let queue = RwLock::new(VecDeque::new()); let ss = TestSnapshotService::new(); let mut io = TestIo::new(&mut client, &ss, &queue, None); - let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io); + let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); io.chain.insert_transaction_to_queue(); // New block import should not trigger propagation. // (we only propagate on timeout) @@ -498,10 +517,10 @@ mod tests { let queue = RwLock::new(VecDeque::new()); let ss = TestSnapshotService::new(); let mut io = TestIo::new(&mut client, &ss, &queue, None); - let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io); + let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); sync.chain_new_blocks(&mut io, &[], &[], &[], &[], &[], &[]); // Try to propagate same transactions for the second time - let peer_count2 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io); + let peer_count2 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); assert_eq!(0, io.packets.len()); assert_eq!(0, peer_count); @@ -519,7 +538,7 @@ mod tests { // should sent some { let mut io = TestIo::new(&mut client, &ss, &queue, None); - let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io); + let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); assert_eq!(1, io.packets.len()); assert_eq!(1, peer_count); } @@ -528,9 +547,9 @@ mod tests { let (peer_count2, peer_count3) = { let mut io = TestIo::new(&mut client, &ss, &queue, None); // Propagate new transactions - let peer_count2 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io); + let peer_count2 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); // And now the peer should have all transactions - let peer_count3 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io); + let peer_count3 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); (peer_count2, peer_count3) }; @@ -553,7 +572,7 @@ mod tests { let queue = RwLock::new(VecDeque::new()); let ss = TestSnapshotService::new(); let mut io = TestIo::new(&mut client, &ss, &queue, None); - SyncPropagator::propagate_new_transactions(&mut sync, &mut io); + SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); let stats = sync.transactions_stats(); assert_eq!(stats.len(), 1, "Should maintain stats for single transaction.") @@ -583,7 +602,7 @@ mod tests { io.peers_info.insert(4, "Parity-Ethereum/v2.7.3-ABCDEFGH".to_owned()); // and new service transaction is propagated to peers - SyncPropagator::propagate_new_transactions(&mut sync, &mut io); + SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); // peer#2 && peer#4 are receiving service transaction assert!(io.packets.iter().any(|p| p.packet_id == 0x02 && p.recipient == 2)); // TRANSACTIONS_PACKET @@ -607,7 +626,7 @@ mod tests { io.peers_info.insert(1, "Parity-Ethereum/v2.6".to_owned()); // and service + non-service transactions are propagated to peers - SyncPropagator::propagate_new_transactions(&mut sync, &mut io); + SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); // two separate packets for peer are queued: // 1) with non-service-transaction diff --git a/ethcore/sync/src/chain/supplier.rs b/ethcore/sync/src/chain/supplier.rs index 4bce0ef98..eaee584ca 100644 --- a/ethcore/sync/src/chain/supplier.rs +++ b/ethcore/sync/src/chain/supplier.rs @@ -27,6 +27,7 @@ use sync_io::SyncIo; use super::{ ChainSync, + SyncHandler, RlpResponseResult, PacketDecodeError, BLOCK_BODIES_PACKET, @@ -47,6 +48,8 @@ use super::{ RECEIPTS_PACKET, SNAPSHOT_DATA_PACKET, SNAPSHOT_MANIFEST_PACKET, + STATUS_PACKET, + TRANSACTIONS_PACKET, }; /// The Chain Sync Supplier: answers requests from peers with available data @@ -56,6 +59,7 @@ impl SyncSupplier { /// Dispatch incoming requests and responses pub fn dispatch_packet(sync: &RwLock, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) { let rlp = Rlp::new(data); + let result = match packet_id { GET_BLOCK_BODIES_PACKET => SyncSupplier::return_rlp(io, &rlp, peer, SyncSupplier::return_block_bodies, @@ -80,9 +84,39 @@ impl SyncSupplier { GET_SNAPSHOT_DATA_PACKET => SyncSupplier::return_rlp(io, &rlp, peer, SyncSupplier::return_snapshot_data, |e| format!("Error sending snapshot data: {:?}", e)), - CONSENSUS_DATA_PACKET => ChainSync::on_consensus_packet(io, peer, &rlp), - _ => { + + STATUS_PACKET => { sync.write().on_packet(io, peer, packet_id, data); + Ok(()) + }, + // Packets that require the peer to be confirmed + _ => { + if !sync.read().peers.contains_key(&peer) { + debug!(target:"sync", "Unexpected packet {} from unregistered peer: {}:{}", packet_id, peer, io.peer_info(peer)); + return; + } + debug!(target: "sync", "{} -> Dispatching packet: {}", peer, packet_id); + + match packet_id { + CONSENSUS_DATA_PACKET => { + SyncHandler::on_consensus_packet(io, peer, &rlp) + }, + TRANSACTIONS_PACKET => { + let res = { + let sync_ro = sync.read(); + SyncHandler::on_peer_transactions(&*sync_ro, io, peer, &rlp) + }; + if res.is_err() { + // peer sent invalid data, disconnect. + io.disable_peer(peer); + sync.write().deactivate_peer(io, peer); + } + }, + _ => { + sync.write().on_packet(io, peer, packet_id, data); + } + } + Ok(()) } }; @@ -404,7 +438,7 @@ mod test { io.sender = Some(2usize); - ChainSync::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, GET_NODE_DATA_PACKET, &node_request); + SyncSupplier::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, GET_NODE_DATA_PACKET, &node_request); assert_eq!(1, io.packets.len()); } @@ -446,7 +480,7 @@ mod test { assert_eq!(603, rlp_result.unwrap().1.out().len()); io.sender = Some(2usize); - ChainSync::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, GET_RECEIPTS_PACKET, &receipts_request); + SyncSupplier::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, GET_RECEIPTS_PACKET, &receipts_request); assert_eq!(1, io.packets.len()); } } diff --git a/ethcore/sync/src/sync_io.rs b/ethcore/sync/src/sync_io.rs index c7704724c..a5e9f7b2f 100644 --- a/ethcore/sync/src/sync_io.rs +++ b/ethcore/sync/src/sync_io.rs @@ -52,7 +52,7 @@ pub trait SyncIo { fn protocol_version(&self, protocol: &ProtocolId, peer_id: PeerId) -> u8; /// Returns if the chain block queue empty fn is_chain_queue_empty(&self) -> bool { - self.chain().queue_info().is_empty() + self.chain().is_queue_empty() } /// Check if the session is expired fn is_expired(&self) -> bool; diff --git a/ethcore/sync/src/tests/helpers.rs b/ethcore/sync/src/tests/helpers.rs index d75d71ea9..d83597e52 100644 --- a/ethcore/sync/src/tests/helpers.rs +++ b/ethcore/sync/src/tests/helpers.rs @@ -33,7 +33,7 @@ use ethcore::test_helpers; use sync_io::SyncIo; use io::{IoChannel, IoContext, IoHandler}; use api::WARP_SYNC_PROTOCOL_ID; -use chain::{ChainSync, ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_3, PRIVATE_TRANSACTION_PACKET, SIGNED_PRIVATE_TRANSACTION_PACKET}; +use chain::{ChainSync, ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_3, PRIVATE_TRANSACTION_PACKET, SIGNED_PRIVATE_TRANSACTION_PACKET, SyncSupplier}; use SyncConfig; use private_tx::SimplePrivateTxHandler; @@ -271,7 +271,7 @@ impl Peer for EthPeer { fn receive_message(&self, from: PeerId, msg: TestPacket) -> HashSet { let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, Some(from)); - ChainSync::dispatch_packet(&self.sync, &mut io, from, msg.packet_id, &msg.data); + SyncSupplier::dispatch_packet(&self.sync, &mut io, from, msg.packet_id, &msg.data); self.chain.flush(); io.to_disconnect.clone() } diff --git a/ethcore/sync/src/transactions_stats.rs b/ethcore/sync/src/transactions_stats.rs index 4d11dcf68..7d5e2ca4a 100644 --- a/ethcore/sync/src/transactions_stats.rs +++ b/ethcore/sync/src/transactions_stats.rs @@ -15,6 +15,7 @@ // along with Parity. If not, see . use api::TransactionStats; +use std::hash::BuildHasher; use std::collections::{HashSet, HashMap}; use ethereum_types::{H256, H512}; use fastmap::H256FastMap; @@ -74,7 +75,7 @@ impl TransactionsStats { } /// Retains only transactions present in given `HashSet`. - pub fn retain(&mut self, hashes: &HashSet) { + pub fn retain(&mut self, hashes: &HashSet) { let to_remove = self.pending_transactions.keys() .filter(|hash| !hashes.contains(hash)) .cloned() diff --git a/miner/src/pool/listener.rs b/miner/src/pool/listener.rs index 5776ba845..f5e76e15d 100644 --- a/miner/src/pool/listener.rs +++ b/miner/src/pool/listener.rs @@ -50,6 +50,10 @@ impl Notifier { /// Notify listeners about all currently pending transactions. pub fn notify(&mut self) { + if self.pending.is_empty() { + return; + } + for l in &self.listeners { (l)(&self.pending); } diff --git a/parity/modules.rs b/parity/modules.rs index e12e8ee45..ac84aea5f 100644 --- a/parity/modules.rs +++ b/parity/modules.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use std::sync::Arc; +use std::sync::{Arc, mpsc}; use ethcore::client::BlockChainClient; use sync::{self, AttachedProtocol, SyncConfig, NetworkConfiguration, Params, ConnectionFilter}; @@ -25,12 +25,17 @@ pub use sync::{EthSync, SyncProvider, ManageNetwork, PrivateTxHandler}; pub use ethcore::client::ChainNotify; use ethcore_logger::Config as LogConfig; -pub type SyncModules = (Arc, Arc, Arc); +pub type SyncModules = ( + Arc, + Arc, + Arc, + mpsc::Sender, +); pub fn sync( - sync_cfg: SyncConfig, - net_cfg: NetworkConfiguration, - client: Arc, + config: SyncConfig, + network_config: NetworkConfiguration, + chain: Arc, snapshot_service: Arc, private_tx_handler: Arc, provider: Arc, @@ -39,15 +44,20 @@ pub fn sync( connection_filter: Option>, ) -> Result { let eth_sync = EthSync::new(Params { - config: sync_cfg, - chain: client, - provider: provider, - snapshot_service: snapshot_service, + config, + chain, + provider, + snapshot_service, private_tx_handler, - network_config: net_cfg, - attached_protos: attached_protos, + network_config, + attached_protos, }, connection_filter)?; - Ok((eth_sync.clone() as Arc, eth_sync.clone() as Arc, eth_sync.clone() as Arc)) + Ok(( + eth_sync.clone() as Arc, + eth_sync.clone() as Arc, + eth_sync.clone() as Arc, + eth_sync.priority_tasks() + )) } diff --git a/parity/run.rs b/parity/run.rs index 4d5d371b4..39712d702 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -15,7 +15,7 @@ // along with Parity. If not, see . use std::any::Any; -use std::sync::{Arc, Weak}; +use std::sync::{Arc, Weak, atomic}; use std::time::{Duration, Instant}; use std::thread; @@ -480,7 +480,6 @@ fn execute_impl(cmd: RunCmd, logger: Arc, on_client_rq: cmd.gas_pricer_conf.to_gas_pricer(fetch.clone(), runtime.executor()), &spec, Some(account_provider.clone()), - )); miner.set_author(cmd.miner_extras.author, None).expect("Fails only if password is Some; password is None; qed"); miner.set_gas_range_target(cmd.miner_extras.gas_range_target); @@ -637,7 +636,7 @@ fn execute_impl(cmd: RunCmd, logger: Arc, on_client_rq: }; // create sync object - let (sync_provider, manage_network, chain_notify) = modules::sync( + let (sync_provider, manage_network, chain_notify, priority_tasks) = modules::sync( sync_config, net_conf.clone().into(), client.clone(), @@ -651,6 +650,18 @@ fn execute_impl(cmd: RunCmd, logger: Arc, on_client_rq: service.add_notify(chain_notify.clone()); + // Propagate transactions as soon as they are imported. + let tx = ::parking_lot::Mutex::new(priority_tasks); + let is_ready = Arc::new(atomic::AtomicBool::new(true)); + miner.add_transactions_listener(Box::new(move |_hashes| { + // we want to have only one PendingTransactions task in the queue. + if is_ready.compare_and_swap(true, false, atomic::Ordering::SeqCst) { + let task = ::sync::PriorityTask::PropagateTransactions(Instant::now(), is_ready.clone()); + // we ignore error cause it means that we are closing + let _ = tx.lock().send(task); + } + })); + // provider not added to a notification center is effectively disabled // TODO [debris] refactor it later on if cmd.private_tx_enabled { @@ -737,7 +748,7 @@ fn execute_impl(cmd: RunCmd, logger: Arc, on_client_rq: let secretstore_deps = secretstore::Dependencies { client: client.clone(), sync: sync_provider.clone(), - miner: miner, + miner: miner.clone(), account_provider: account_provider, accounts_passwords: &passwords, }; diff --git a/util/fastmap/src/lib.rs b/util/fastmap/src/lib.rs index 135ce54ba..65dd9dfb4 100644 --- a/util/fastmap/src/lib.rs +++ b/util/fastmap/src/lib.rs @@ -21,11 +21,13 @@ extern crate plain_hasher; use ethereum_types::H256; use std::hash; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use plain_hasher::PlainHasher; /// Specialized version of `HashMap` with H256 keys and fast hashing function. pub type H256FastMap = HashMap>; +/// Specialized version of HashSet with H256 values and fast hashing function. +pub type H256FastSet = HashSet>; #[cfg(test)] mod tests { @@ -36,4 +38,4 @@ mod tests { let mut h = H256FastMap::default(); h.insert(H256::from(123), "abc"); } -} \ No newline at end of file +}