diff --git a/crates/ethcore/sync/src/chain/mod.rs b/crates/ethcore/sync/src/chain/mod.rs index 60596233e..f6e6114ae 100644 --- a/crates/ethcore/sync/src/chain/mod.rs +++ b/crates/ethcore/sync/src/chain/mod.rs @@ -108,14 +108,17 @@ use ethereum_types::{H256, U256}; use fastmap::{H256FastMap, H256FastSet}; use hash::keccak; use network::{self, client_version::ClientVersion, PeerId}; -use parking_lot::{Mutex, RwLock, RwLockWriteGuard}; +use parking_lot::Mutex; use rand::Rng; use rlp::{DecoderError, RlpStream}; use snapshot::Snapshot; use std::{ cmp, collections::{BTreeMap, HashMap, HashSet}, - sync::mpsc, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc, RwLock as StdRwLock, RwLockWriteGuard as StdRwLockWriteGuard, + }, time::{Duration, Instant}, }; use sync_io::SyncIo; @@ -398,8 +401,10 @@ pub type Peers = HashMap; pub struct ChainSyncApi { /// Priority tasks queue priority_tasks: Mutex>, + /// Gate for executing only one priority timer. + priority_tasks_gate: AtomicBool, /// The rest of sync data - sync: RwLock, + sync: StdRwLock, } impl ChainSyncApi { @@ -411,31 +416,33 @@ impl ChainSyncApi { priority_tasks: mpsc::Receiver, ) -> Self { ChainSyncApi { - sync: RwLock::new(ChainSync::new(config, chain, fork_filter)), + sync: StdRwLock::new(ChainSync::new(config, chain, fork_filter)), priority_tasks: Mutex::new(priority_tasks), + priority_tasks_gate: AtomicBool::new(false), } } /// Gives `write` access to underlying `ChainSync` - pub fn write(&self) -> RwLockWriteGuard { - self.sync.write() + pub fn write(&self) -> StdRwLockWriteGuard { + self.sync.write().unwrap() } /// Returns info about given list of peers pub fn peer_info(&self, ids: &[PeerId]) -> Vec> { - let sync = self.sync.read(); + let sync = self.sync.read().unwrap(); ids.iter().map(|id| sync.peer_info(id)).collect() } /// Returns synchonization status pub fn status(&self) -> SyncStatus { - self.sync.read().status() + self.sync.read().unwrap().status() } /// Returns transactions propagation statistics pub fn transactions_stats(&self) -> BTreeMap { self.sync .read() + .unwrap() .transactions_stats() .iter() .map(|(hash, stats)| (*hash, stats.into())) @@ -449,7 +456,7 @@ impl ChainSyncApi { /// Process the queue with requests, that were delayed with response. pub fn process_delayed_requests(&self, io: &mut dyn SyncIo) { - let requests = self.sync.write().retrieve_delayed_requests(); + let requests = self.sync.write().unwrap().retrieve_delayed_requests(); if !requests.is_empty() { debug!(target: "sync", "Processing {} delayed requests", requests.len()); for (peer_id, packet_id, packet_data) in requests { @@ -480,6 +487,13 @@ impl ChainSyncApi { } } + if self + .priority_tasks_gate + .compare_and_swap(false, true, Ordering::AcqRel) + { + return; + } + // deadline to get the task from the queue let deadline = Instant::now() + ::api::PRIORITY_TIMER_INTERVAL; let mut work = || { @@ -489,9 +503,8 @@ impl ChainSyncApi { tasks.recv_timeout(left).ok()? }; task.starting(); - // wait for the sync lock until deadline, - // note we might drop the task here if we won't manage to acquire the lock. - let mut sync = self.sync.try_write_until(deadline)?; + // wait for the sync lock + let mut sync = self.sync.write().unwrap(); // since we already have everything let's use a different deadline // to do the rest of the job now, so that previous work is not wasted. let deadline = Instant::now() + PRIORITY_TASK_DEADLINE; @@ -538,6 +551,7 @@ impl ChainSyncApi { // Process as many items as we can until the deadline is reached. loop { if work().is_none() { + self.priority_tasks_gate.store(false, Ordering::Release); return; } } diff --git a/crates/ethcore/sync/src/chain/supplier.rs b/crates/ethcore/sync/src/chain/supplier.rs index 4c6092cd2..3746819d5 100644 --- a/crates/ethcore/sync/src/chain/supplier.rs +++ b/crates/ethcore/sync/src/chain/supplier.rs @@ -24,9 +24,8 @@ pub const PAYLOAD_SOFT_LIMIT: usize = 100_000; use enum_primitive::FromPrimitive; use ethereum_types::H256; use network::{self, PeerId}; -use parking_lot::RwLock; use rlp::{Rlp, RlpStream}; -use std::cmp; +use std::{cmp, sync::RwLock as StdRwLock}; use types::{ids::BlockId, BlockNumber}; use sync_io::SyncIo; @@ -54,7 +53,7 @@ impl SyncSupplier { // Take a u8 and not a SyncPacketId because this is the entry point // to chain sync from the outside world. pub fn dispatch_packet( - sync: &RwLock, + sync: &StdRwLock, io: &mut dyn SyncIo, peer: PeerId, packet_id: u8, @@ -102,12 +101,12 @@ impl SyncSupplier { ), StatusPacket => { - sync.write().on_packet(io, peer, packet_id, data); + sync.write().unwrap().on_packet(io, peer, packet_id, data); Ok(()) } // Packets that require the peer to be confirmed _ => { - if !sync.read().peers.contains_key(&peer) { + if !sync.read().unwrap().peers.contains_key(&peer) { debug!(target:"sync", "Unexpected packet {} from unregistered peer: {}:{}", packet_id, peer, io.peer_version(peer)); return; } @@ -117,17 +116,17 @@ impl SyncSupplier { ConsensusDataPacket => SyncHandler::on_consensus_packet(io, peer, &rlp), TransactionsPacket => { let res = { - let sync_ro = sync.read(); + let sync_ro = sync.read().unwrap(); SyncHandler::on_peer_transactions(&*sync_ro, io, peer, &rlp) }; if res.is_err() { // peer sent invalid data, disconnect. io.disable_peer(peer); - sync.write().deactivate_peer(io, peer); + sync.write().unwrap().deactivate_peer(io, peer); } } _ => { - sync.write().on_packet(io, peer, packet_id, data); + sync.write().unwrap().on_packet(io, peer, packet_id, data); } } @@ -139,9 +138,10 @@ impl SyncSupplier { Err(PacketProcessError::Decoder(e)) => { debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e) } - Err(PacketProcessError::ClientBusy) => { - sync.write().add_delayed_request(peer, packet_id, data) - } + Err(PacketProcessError::ClientBusy) => sync + .write() + .unwrap() + .add_delayed_request(peer, packet_id, data), Ok(()) => {} } } @@ -150,7 +150,7 @@ impl SyncSupplier { /// Dispatch delayed request /// The main difference with dispatch packet is the direct send of the responses to the peer pub fn dispatch_delayed_request( - sync: &RwLock, + sync: &StdRwLock, io: &mut dyn SyncIo, peer: PeerId, packet_id: u8, @@ -178,9 +178,10 @@ impl SyncSupplier { Err(PacketProcessError::Decoder(e)) => { debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e) } - Err(PacketProcessError::ClientBusy) => { - sync.write().add_delayed_request(peer, packet_id, data) - } + Err(PacketProcessError::ClientBusy) => sync + .write() + .unwrap() + .add_delayed_request(peer, packet_id, data), Ok(()) => {} } } @@ -420,7 +421,7 @@ mod test { use ethereum_types::H256; use parking_lot::RwLock; use rlp::{Rlp, RlpStream}; - use std::collections::VecDeque; + use std::{collections::VecDeque, sync::RwLock as StdRwLock}; use tests::{helpers::TestIo, snapshot::TestSnapshotService}; #[test] @@ -644,7 +645,7 @@ mod test { io.sender = Some(2usize); SyncSupplier::dispatch_packet( - &RwLock::new(sync), + &StdRwLock::new(sync), &mut io, 0usize, GetReceiptsPacket.id(), diff --git a/crates/ethcore/sync/src/tests/chain.rs b/crates/ethcore/sync/src/tests/chain.rs index 3cc12729b..fdcb5e195 100644 --- a/crates/ethcore/sync/src/tests/chain.rs +++ b/crates/ethcore/sync/src/tests/chain.rs @@ -57,7 +57,7 @@ fn status_after_sync() { net.peer(1).chain.add_blocks(1000, EachBlockWith::Uncle); net.peer(2).chain.add_blocks(1000, EachBlockWith::Uncle); net.sync(); - let status = net.peer(0).sync.read().status(); + let status = net.peer(0).sync.read().unwrap().status(); assert_eq!(status.state, SyncState::Idle); } @@ -177,19 +177,22 @@ fn restart() { assert!(net.peer(0).chain.chain_info().best_block_number > 100); net.restart_peer(0); - let status = net.peer(0).sync.read().status(); + let status = net.peer(0).sync.read().unwrap().status(); assert_eq!(status.state, SyncState::Idle); } #[test] fn status_empty() { let net = TestNet::new(2); - assert_eq!(net.peer(0).sync.read().status().state, SyncState::Idle); + assert_eq!( + net.peer(0).sync.read().unwrap().status().state, + SyncState::Idle + ); let mut config = SyncConfig::default(); config.warp_sync = WarpSync::Enabled; let net = TestNet::new_with_config(2, config); assert_eq!( - net.peer(0).sync.read().status().state, + net.peer(0).sync.read().unwrap().status().state, SyncState::WaitingPeers ); } diff --git a/crates/ethcore/sync/src/tests/helpers.rs b/crates/ethcore/sync/src/tests/helpers.rs index 4beb7d1f1..89eb22dd6 100644 --- a/crates/ethcore/sync/src/tests/helpers.rs +++ b/crates/ethcore/sync/src/tests/helpers.rs @@ -37,7 +37,10 @@ use network::{self, client_version::ClientVersion, PacketId, PeerId, ProtocolId, use parking_lot::RwLock; use std::{ collections::{HashMap, HashSet, VecDeque}, - sync::Arc, + sync::{ + Arc, RwLock as StdRwLock, RwLockReadGuard as StdRwLockReadGuard, + RwLockWriteGuard as StdRwLockWriteGuard, + }, }; use sync_io::SyncIo; use tests::snapshot::*; @@ -249,7 +252,7 @@ where pub chain: Arc, pub miner: Arc, pub snapshot_service: Arc, - pub sync: RwLock, + pub sync: StdRwLock, pub queue: RwLock>, pub io_queue: RwLock>, new_blocks_queue: RwLock>, @@ -270,15 +273,17 @@ where fn process_io_message(&self, message: ChainMessageType) { let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None); match message { - ChainMessageType::Consensus(data) => { - self.sync.write().propagate_consensus_packet(&mut io, data) - } + ChainMessageType::Consensus(data) => self + .sync + .write() + .unwrap() + .propagate_consensus_packet(&mut io, data), } } fn process_new_block_message(&self, message: NewBlockMessage) { let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None); - self.sync.write().chain_new_blocks( + self.sync.write().unwrap().chain_new_blocks( &mut io, &message.imported, &message.invalid, @@ -294,8 +299,8 @@ impl Peer for EthPeer { type Message = TestPacket; fn on_connect(&self, other: PeerId) { - self.sync.write().update_targets(&*self.chain); - self.sync.write().on_peer_connected( + self.sync.write().unwrap().update_targets(&*self.chain); + self.sync.write().unwrap().on_peer_connected( &mut TestIo::new( &*self.chain, &self.snapshot_service, @@ -313,7 +318,7 @@ impl Peer for EthPeer { &self.queue, Some(other), ); - self.sync.write().on_peer_aborting(&mut io, other); + self.sync.write().unwrap().on_peer_aborting(&mut io, other); } fn receive_message(&self, from: PeerId, msg: TestPacket) -> HashSet { @@ -340,14 +345,17 @@ impl Peer for EthPeer { fn sync_step(&self) { let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None); self.chain.flush(); - self.sync.write().maintain_peers(&mut io); - self.sync.write().maintain_sync(&mut io); - self.sync.write().continue_sync(&mut io); - self.sync.write().propagate_new_transactions(&mut io); + self.sync.write().unwrap().maintain_peers(&mut io); + self.sync.write().unwrap().maintain_sync(&mut io); + self.sync.write().unwrap().continue_sync(&mut io); + self.sync + .write() + .unwrap() + .propagate_new_transactions(&mut io); } fn restart_sync(&self) { - self.sync.write().restart(&mut TestIo::new( + self.sync.write().unwrap().restart(&mut TestIo::new( &*self.chain, &self.snapshot_service, &self.queue, @@ -400,7 +408,7 @@ impl TestNet> { let ss = Arc::new(TestSnapshotService::new()); let sync = ChainSync::new(config.clone(), &chain, ForkFilterApi::new_dummy(&chain)); net.peers.push(Arc::new(EthPeer { - sync: RwLock::new(sync), + sync: StdRwLock::new(sync), snapshot_service: ss, chain: Arc::new(chain), miner: Arc::new(Miner::new_for_tests(&Spec::new_test(), None)), @@ -449,7 +457,7 @@ impl TestNet> { let ss = Arc::new(TestSnapshotService::new()); let sync = ChainSync::new(config, &*client, ForkFilterApi::new_dummy(&*client)); let peer = Arc::new(EthPeer { - sync: RwLock::new(sync), + sync: StdRwLock::new(sync), snapshot_service: ss, chain: client, miner, @@ -558,7 +566,7 @@ where impl TestNet> { pub fn trigger_chain_new_blocks(&mut self, peer_id: usize) { let peer = &mut self.peers[peer_id]; - peer.sync.write().chain_new_blocks( + peer.sync.write().unwrap().chain_new_blocks( &mut TestIo::new(&*peer.chain, &peer.snapshot_service, &peer.queue, None), &[], &[],