replace synchronization primitives with those from parking_lot (#1593)

* parking_lot in cargo.toml

* replace all lock invocations with parking_lot ones

* use parking_lot synchronization primitives
This commit is contained in:
Robert Habermeier
2016-07-13 19:59:59 +02:00
committed by Gav Wood
parent 4226c0f631
commit 36d3d0d7d7
50 changed files with 547 additions and 550 deletions

View File

@@ -35,6 +35,7 @@ vergen = "0.1"
target_info = "0.1"
bigint = { path = "bigint" }
chrono = "0.2"
parking_lot = "0.2.6"
using_queue = { path = "using_queue" }
table = { path = "table" }
ansi_term = "0.7"

View File

@@ -14,18 +14,19 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::sync::*;
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use std::collections::HashMap;
use mio::*;
use crossbeam::sync::chase_lev;
use slab::Slab;
use error::*;
use misc::*;
use io::{IoError, IoHandler};
use io::worker::{Worker, Work, WorkType};
use panics::*;
use parking_lot::{Condvar, RwLock, Mutex};
/// Timer ID
pub type TimerToken = usize;
/// Timer ID
@@ -228,7 +229,7 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
let handler_index = token.as_usize() / TOKENS_PER_HANDLER;
let token_id = token.as_usize() % TOKENS_PER_HANDLER;
if let Some(handler) = self.handlers.get(handler_index) {
if let Some(timer) = self.timers.unwrapped_read().get(&token.as_usize()) {
if let Some(timer) = self.timers.read().get(&token.as_usize()) {
event_loop.timeout_ms(token, timer.delay).expect("Error re-registering user timer");
self.worker_channel.push(Work { work_type: WorkType::Timeout, token: token_id, handler: handler.clone(), handler_id: handler_index });
self.work_ready.notify_all();
@@ -250,7 +251,7 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
// TODO: flush event loop
self.handlers.remove(handler_id);
// unregister timers
let mut timers = self.timers.unwrapped_write();
let mut timers = self.timers.write();
let to_remove: Vec<_> = timers.keys().cloned().filter(|timer_id| timer_id / TOKENS_PER_HANDLER == handler_id).collect();
for timer_id in to_remove {
let timer = timers.remove(&timer_id).expect("to_remove only contains keys from timers; qed");
@@ -260,11 +261,11 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
IoMessage::AddTimer { handler_id, token, delay } => {
let timer_id = token + handler_id * TOKENS_PER_HANDLER;
let timeout = event_loop.timeout_ms(Token(timer_id), delay).expect("Error registering user timer");
self.timers.unwrapped_write().insert(timer_id, UserTimer { delay: delay, timeout: timeout });
self.timers.write().insert(timer_id, UserTimer { delay: delay, timeout: timeout });
},
IoMessage::RemoveTimer { handler_id, token } => {
let timer_id = token + handler_id * TOKENS_PER_HANDLER;
if let Some(timer) = self.timers.unwrapped_write().remove(&timer_id) {
if let Some(timer) = self.timers.write().remove(&timer_id) {
event_loop.clear_timeout(timer.timeout);
}
},
@@ -278,7 +279,7 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
handler.deregister_stream(token, event_loop);
// unregister a timer associated with the token (if any)
let timer_id = token + handler_id * TOKENS_PER_HANDLER;
if let Some(timer) = self.timers.unwrapped_write().remove(&timer_id) {
if let Some(timer) = self.timers.write().remove(&timer_id) {
event_loop.clear_timeout(timer.timeout);
}
}

View File

@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::sync::*;
use std::sync::Arc;
use std::mem;
use std::thread::{JoinHandle, self};
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
@@ -22,7 +22,8 @@ use crossbeam::sync::chase_lev;
use io::service::{HandlerId, IoChannel, IoContext};
use io::{IoHandler};
use panics::*;
use misc::Lockable;
use parking_lot::{Condvar, Mutex};
pub enum WorkType<Message> {
Readable,
@@ -82,11 +83,11 @@ impl Worker {
where Message: Send + Sync + Clone + 'static {
loop {
{
let lock = wait_mutex.locked();
let mut lock = wait_mutex.lock();
if deleting.load(AtomicOrdering::Acquire) {
return;
}
let _ = wait.wait(lock).unwrap();
let _ = wait.wait(&mut lock);
}
if deleting.load(AtomicOrdering::Acquire) {

View File

@@ -20,7 +20,6 @@ use common::*;
use rlp::*;
use hashdb::*;
use memorydb::*;
use misc::RwLockable;
use super::{DB_PREFIX_LEN, LATEST_ERA_KEY, VERSION_KEY};
use super::traits::JournalDB;
use kvdb::{Database, DBTransaction, DatabaseConfig};
@@ -226,7 +225,7 @@ impl EarlyMergeDB {
#[cfg(test)]
fn can_reconstruct_refs(&self) -> bool {
let (latest_era, reconstructed) = Self::read_refs(&self.backing);
let refs = self.refs.as_ref().unwrap().unwrapped_write();
let refs = self.refs.as_ref().unwrap().write();
if *refs != reconstructed || latest_era != self.latest_era {
let clean_refs = refs.iter().filter_map(|(k, v)| if reconstructed.get(k) == Some(v) {None} else {Some((k.clone(), v.clone()))}).collect::<HashMap<_, _>>();
let clean_recon = reconstructed.into_iter().filter_map(|(k, v)| if refs.get(&k) == Some(&v) {None} else {Some((k.clone(), v.clone()))}).collect::<HashMap<_, _>>();
@@ -334,7 +333,7 @@ impl JournalDB for EarlyMergeDB {
fn mem_used(&self) -> usize {
self.overlay.mem_used() + match self.refs {
Some(ref c) => c.unwrapped_read().heap_size_of_children(),
Some(ref c) => c.read().heap_size_of_children(),
None => 0
}
}
@@ -390,7 +389,7 @@ impl JournalDB for EarlyMergeDB {
//
// record new commit's details.
let mut refs = self.refs.as_ref().unwrap().unwrapped_write();
let mut refs = self.refs.as_ref().unwrap().write();
let batch = DBTransaction::new();
let trace = false;
{

View File

@@ -20,7 +20,6 @@ use common::*;
use rlp::*;
use hashdb::*;
use memorydb::*;
use misc::RwLockable;
use super::{DB_PREFIX_LEN, LATEST_ERA_KEY, VERSION_KEY};
use kvdb::{Database, DBTransaction, DatabaseConfig};
#[cfg(test)]
@@ -137,7 +136,7 @@ impl OverlayRecentDB {
#[cfg(test)]
fn can_reconstruct_refs(&self) -> bool {
let reconstructed = Self::read_overlay(&self.backing);
let journal_overlay = self.journal_overlay.unwrapped_read();
let journal_overlay = self.journal_overlay.read();
*journal_overlay == reconstructed
}
@@ -207,7 +206,7 @@ impl JournalDB for OverlayRecentDB {
fn mem_used(&self) -> usize {
let mut mem = self.transaction_overlay.mem_used();
let overlay = self.journal_overlay.unwrapped_read();
let overlay = self.journal_overlay.read();
mem += overlay.backing_overlay.mem_used();
mem += overlay.journal.heap_size_of_children();
mem
@@ -217,17 +216,17 @@ impl JournalDB for OverlayRecentDB {
self.backing.get(&LATEST_ERA_KEY).expect("Low level database error").is_none()
}
fn latest_era(&self) -> Option<u64> { self.journal_overlay.unwrapped_read().latest_era }
fn latest_era(&self) -> Option<u64> { self.journal_overlay.read().latest_era }
fn state(&self, key: &H256) -> Option<Bytes> {
let v = self.journal_overlay.unwrapped_read().backing_overlay.get(&OverlayRecentDB::to_short_key(key)).map(|v| v.to_vec());
let v = self.journal_overlay.read().backing_overlay.get(&OverlayRecentDB::to_short_key(key)).map(|v| v.to_vec());
v.or_else(|| self.backing.get_by_prefix(&key[0..DB_PREFIX_LEN]).map(|b| b.to_vec()))
}
fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
// record new commit's details.
trace!("commit: #{} ({}), end era: {:?}", now, id, end);
let mut journal_overlay = self.journal_overlay.unwrapped_write();
let mut journal_overlay = self.journal_overlay.write();
let batch = DBTransaction::new();
{
let mut r = RlpStream::new_list(3);
@@ -334,7 +333,7 @@ impl HashDB for OverlayRecentDB {
match k {
Some(&(ref d, rc)) if rc > 0 => Some(d),
_ => {
let v = self.journal_overlay.unwrapped_read().backing_overlay.get(&OverlayRecentDB::to_short_key(key)).map(|v| v.to_vec());
let v = self.journal_overlay.read().backing_overlay.get(&OverlayRecentDB::to_short_key(key)).map(|v| v.to_vec());
match v {
Some(x) => {
Some(&self.transaction_overlay.denote(key, x).0)

View File

@@ -116,6 +116,7 @@ extern crate libc;
extern crate target_info;
extern crate bigint;
extern crate chrono;
extern crate parking_lot;
pub extern crate using_queue;
pub extern crate table;
extern crate ansi_term;

View File

@@ -18,14 +18,14 @@
use std::env;
use std::borrow::Cow;
use rlog::{LogLevelFilter};
use rlog::LogLevelFilter;
use env_logger::LogBuilder;
use std::sync::{RwLock, RwLockReadGuard};
use std::sync::atomic::{Ordering, AtomicBool};
use arrayvec::ArrayVec;
use misc::RwLockable;
pub use ansi_term::{Colour, Style};
use parking_lot::{RwLock, RwLockReadGuard};
lazy_static! {
static ref USE_COLOR: AtomicBool = AtomicBool::new(false);
}
@@ -91,7 +91,7 @@ impl RotatingLogger {
/// Append new log entry
pub fn append(&self, log: String) {
self.logs.unwrapped_write().insert(0, log);
self.logs.write().insert(0, log);
}
/// Return levels
@@ -101,7 +101,7 @@ impl RotatingLogger {
/// Return logs
pub fn logs(&self) -> RwLockReadGuard<ArrayVec<[String; LOG_SIZE]>> {
self.logs.unwrapped_read()
self.logs.read()
}
}

View File

@@ -64,29 +64,4 @@ pub fn version_data() -> Bytes {
s.append(&rustc_version());
s.append(&&Target::os()[0..2]);
s.out()
}
/// Object can be locked directly into a `MutexGuard`.
pub trait Lockable<T> {
/// Lock object directly into a `MutexGuard`.
fn locked(&self) -> MutexGuard<T>;
}
impl<T> Lockable<T> for Mutex<T> {
fn locked(&self) -> MutexGuard<T> { self.lock().unwrap() }
}
/// Object can be read or write locked directly into a guard.
pub trait RwLockable<T> {
/// Read-lock object directly into a `ReadGuard`.
fn unwrapped_read(&self) -> RwLockReadGuard<T>;
/// Write-lock object directly into a `WriteGuard`.
fn unwrapped_write(&self) -> RwLockWriteGuard<T>;
}
impl<T> RwLockable<T> for RwLock<T> {
fn unwrapped_read(&self) -> RwLockReadGuard<T> { self.read().unwrap() }
fn unwrapped_write(&self) -> RwLockWriteGuard<T> { self.write().unwrap() }
}
}

View File

@@ -96,13 +96,13 @@ impl<Socket: GenericSocket> GenericConnection<Socket> {
}
},
Ok(_) => return Ok(None),
Err(e) => {
Err(e) => {
debug!(target:"network", "Read error {} ({})", self.token, e);
return Err(e)
}
}
}
}
}
/// Add a packet to send queue.
pub fn send<Message>(&mut self, io: &IoContext<Message>, data: Bytes) where Message: Send + Clone {
@@ -490,7 +490,7 @@ pub fn test_encryption() {
#[cfg(test)]
mod tests {
use super::*;
use std::sync::*;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use super::super::stats::*;
use std::io::{Read, Write, Error, Cursor, ErrorKind};

View File

@@ -17,7 +17,7 @@
use std::net::SocketAddr;
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::sync::*;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use std::ops::*;
use std::cmp::min;
@@ -42,6 +42,7 @@ use network::error::{NetworkError, DisconnectReason};
use network::discovery::{Discovery, TableUpdates, NodeEntry};
use network::ip_utils::{map_external_address, select_public_address};
use path::restrict_permissions_owner;
use parking_lot::{Mutex, RwLock};
type Slab<T> = ::slab::Slab<T, usize>;
@@ -201,7 +202,7 @@ impl<'s> NetworkContext<'s> {
protocol: ProtocolId,
session: Option<SharedSession>, sessions: Arc<RwLock<Slab<SharedSession>>>,
reserved_peers: &'s HashSet<NodeId>) -> NetworkContext<'s> {
let id = session.as_ref().map(|s| s.locked().token());
let id = session.as_ref().map(|s| s.lock().token());
NetworkContext {
io: io,
protocol: protocol,
@@ -215,7 +216,7 @@ impl<'s> NetworkContext<'s> {
fn resolve_session(&self, peer: PeerId) -> Option<SharedSession> {
match self.session_id {
Some(id) if id == peer => self.session.clone(),
_ => self.sessions.unwrapped_read().get(peer).cloned(),
_ => self.sessions.read().get(peer).cloned(),
}
}
@@ -223,7 +224,7 @@ impl<'s> NetworkContext<'s> {
pub fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError> {
let session = self.resolve_session(peer);
if let Some(session) = session {
try!(session.locked().send_packet(self.io, self.protocol, packet_id as u8, &data));
try!(session.lock().send_packet(self.io, self.protocol, packet_id as u8, &data));
} else {
trace!(target: "network", "Send: Peer no longer exist")
}
@@ -256,7 +257,7 @@ impl<'s> NetworkContext<'s> {
/// Check if the session is still active.
pub fn is_expired(&self) -> bool {
self.session.as_ref().map_or(false, |s| s.locked().expired())
self.session.as_ref().map_or(false, |s| s.lock().expired())
}
/// Register a new IO timer. 'IoHandler::timeout' will be called with the token.
@@ -273,7 +274,7 @@ impl<'s> NetworkContext<'s> {
pub fn peer_info(&self, peer: PeerId) -> String {
let session = self.resolve_session(peer);
if let Some(session) = session {
return session.locked().info.client_version.clone()
return session.lock().info.client_version.clone()
}
"unknown".to_owned()
}
@@ -416,8 +417,8 @@ impl Host {
Ok(n) => {
let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() };
self.nodes.unwrapped_write().add_node(n);
if let Some(ref mut discovery) = *self.discovery.locked() {
self.nodes.write().add_node(n);
if let Some(ref mut discovery) = *self.discovery.lock() {
discovery.add_node(entry);
}
}
@@ -428,9 +429,9 @@ impl Host {
let n = try!(Node::from_str(id));
let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() };
self.reserved_nodes.unwrapped_write().insert(n.id.clone());
self.reserved_nodes.write().insert(n.id.clone());
if let Some(ref mut discovery) = *self.discovery.locked() {
if let Some(ref mut discovery) = *self.discovery.lock() {
discovery.add_node(entry);
}
@@ -438,17 +439,17 @@ impl Host {
}
pub fn set_non_reserved_mode(&self, mode: NonReservedPeerMode, io: &IoContext<NetworkIoMessage>) {
let mut info = self.info.unwrapped_write();
let mut info = self.info.write();
if info.config.non_reserved_mode != mode {
info.config.non_reserved_mode = mode.clone();
drop(info);
if let NonReservedPeerMode::Deny = mode {
// disconnect all non-reserved peers here.
let reserved: HashSet<NodeId> = self.reserved_nodes.unwrapped_read().clone();
let reserved: HashSet<NodeId> = self.reserved_nodes.read().clone();
let mut to_kill = Vec::new();
for e in self.sessions.unwrapped_write().iter_mut() {
let mut s = e.locked();
for e in self.sessions.write().iter_mut() {
let mut s = e.lock();
{
let id = s.id();
if id.is_some() && reserved.contains(id.unwrap()) {
@@ -469,7 +470,7 @@ impl Host {
pub fn remove_reserved_node(&self, id: &str) -> Result<(), UtilError> {
let n = try!(Node::from_str(id));
self.reserved_nodes.unwrapped_write().remove(&n.id);
self.reserved_nodes.write().remove(&n.id);
Ok(())
}
@@ -479,11 +480,11 @@ impl Host {
}
pub fn external_url(&self) -> Option<String> {
self.info.unwrapped_read().public_endpoint.as_ref().map(|e| format!("{}", Node::new(self.info.unwrapped_read().id().clone(), e.clone())))
self.info.read().public_endpoint.as_ref().map(|e| format!("{}", Node::new(self.info.read().id().clone(), e.clone())))
}
pub fn local_url(&self) -> String {
let r = format!("{}", Node::new(self.info.unwrapped_read().id().clone(), self.info.unwrapped_read().local_endpoint.clone()));
let r = format!("{}", Node::new(self.info.read().id().clone(), self.info.read().local_endpoint.clone()));
println!("{}", r);
r
}
@@ -491,8 +492,8 @@ impl Host {
pub fn stop(&self, io: &IoContext<NetworkIoMessage>) -> Result<(), UtilError> {
self.stopping.store(true, AtomicOrdering::Release);
let mut to_kill = Vec::new();
for e in self.sessions.unwrapped_write().iter_mut() {
let mut s = e.locked();
for e in self.sessions.write().iter_mut() {
let mut s = e.lock();
s.disconnect(io, DisconnectReason::ClientQuit);
to_kill.push(s.token());
}
@@ -505,16 +506,16 @@ impl Host {
}
fn init_public_interface(&self, io: &IoContext<NetworkIoMessage>) -> Result<(), UtilError> {
if self.info.unwrapped_read().public_endpoint.is_some() {
if self.info.read().public_endpoint.is_some() {
return Ok(());
}
let local_endpoint = self.info.unwrapped_read().local_endpoint.clone();
let public_address = self.info.unwrapped_read().config.public_address.clone();
let local_endpoint = self.info.read().local_endpoint.clone();
let public_address = self.info.read().config.public_address.clone();
let public_endpoint = match public_address {
None => {
let public_address = select_public_address(local_endpoint.address.port());
let public_endpoint = NodeEndpoint { address: public_address, udp_port: local_endpoint.udp_port };
if self.info.unwrapped_read().config.nat_enabled {
if self.info.read().config.nat_enabled {
match map_external_address(&local_endpoint) {
Some(endpoint) => {
info!("NAT mapped to external address {}", endpoint.address);
@@ -529,7 +530,7 @@ impl Host {
Some(addr) => NodeEndpoint { address: addr, udp_port: local_endpoint.udp_port }
};
self.info.unwrapped_write().public_endpoint = Some(public_endpoint.clone());
self.info.write().public_endpoint = Some(public_endpoint.clone());
if let Some(url) = self.external_url() {
io.message(NetworkIoMessage::NetworkStarted(url)).unwrap_or_else(|e| warn!("Error sending IO notification: {:?}", e));
@@ -537,7 +538,7 @@ impl Host {
// Initialize discovery.
let discovery = {
let info = self.info.unwrapped_read();
let info = self.info.read();
if info.config.discovery_enabled && info.config.non_reserved_mode == NonReservedPeerMode::Accept {
let mut udp_addr = local_endpoint.address.clone();
udp_addr.set_port(local_endpoint.udp_port);
@@ -546,11 +547,11 @@ impl Host {
};
if let Some(mut discovery) = discovery {
discovery.init_node_list(self.nodes.unwrapped_read().unordered_entries());
for n in self.nodes.unwrapped_read().unordered_entries() {
discovery.init_node_list(self.nodes.read().unordered_entries());
for n in self.nodes.read().unordered_entries() {
discovery.add_node(n.clone());
}
*self.discovery.locked() = Some(discovery);
*self.discovery.lock() = Some(discovery);
io.register_stream(DISCOVERY).expect("Error registering UDP listener");
io.register_timer(DISCOVERY_REFRESH, 7200).expect("Error registering discovery timer");
io.register_timer(DISCOVERY_ROUND, 300).expect("Error registering discovery timer");
@@ -566,7 +567,7 @@ impl Host {
}
fn have_session(&self, id: &NodeId) -> bool {
self.sessions.unwrapped_read().iter().any(|e| e.locked().info.id == Some(id.clone()))
self.sessions.read().iter().any(|e| e.lock().info.id == Some(id.clone()))
}
fn session_count(&self) -> usize {
@@ -574,17 +575,17 @@ impl Host {
}
fn connecting_to(&self, id: &NodeId) -> bool {
self.sessions.unwrapped_read().iter().any(|e| e.locked().id() == Some(id))
self.sessions.read().iter().any(|e| e.lock().id() == Some(id))
}
fn handshake_count(&self) -> usize {
self.sessions.unwrapped_read().count() - self.session_count()
self.sessions.read().count() - self.session_count()
}
fn keep_alive(&self, io: &IoContext<NetworkIoMessage>) {
let mut to_kill = Vec::new();
for e in self.sessions.unwrapped_write().iter_mut() {
let mut s = e.locked();
for e in self.sessions.write().iter_mut() {
let mut s = e.lock();
if !s.keep_alive(io) {
s.disconnect(io, DisconnectReason::PingTimeout);
to_kill.push(s.token());
@@ -598,7 +599,7 @@ impl Host {
fn connect_peers(&self, io: &IoContext<NetworkIoMessage>) {
let (ideal_peers, mut pin) = {
let info = self.info.unwrapped_read();
let info = self.info.read();
if info.capabilities.is_empty() {
return;
}
@@ -608,7 +609,7 @@ impl Host {
};
let session_count = self.session_count();
let reserved_nodes = self.reserved_nodes.unwrapped_read();
let reserved_nodes = self.reserved_nodes.read();
if session_count >= ideal_peers as usize + reserved_nodes.len() {
// check if all pinned nodes are connected.
if reserved_nodes.iter().all(|n| self.have_session(n) && self.connecting_to(n)) {
@@ -629,7 +630,7 @@ impl Host {
// iterate over all nodes, reserved ones coming first.
// if we are pinned to only reserved nodes, ignore all others.
let nodes = reserved_nodes.iter().cloned().chain(if !pin {
self.nodes.unwrapped_read().nodes()
self.nodes.read().nodes()
} else {
Vec::new()
});
@@ -657,7 +658,7 @@ impl Host {
let socket = {
let address = {
let mut nodes = self.nodes.unwrapped_write();
let mut nodes = self.nodes.write();
if let Some(node) = nodes.get_mut(id) {
node.last_attempted = Some(::time::now());
node.endpoint.address
@@ -682,11 +683,11 @@ impl Host {
#[cfg_attr(feature="dev", allow(block_in_if_condition_stmt))]
fn create_connection(&self, socket: TcpStream, id: Option<&NodeId>, io: &IoContext<NetworkIoMessage>) -> Result<(), UtilError> {
let nonce = self.info.unwrapped_write().next_nonce();
let mut sessions = self.sessions.unwrapped_write();
let nonce = self.info.write().next_nonce();
let mut sessions = self.sessions.write();
let token = sessions.insert_with_opt(|token| {
match Session::new(io, socket, token, id, &nonce, self.stats.clone(), &self.info.unwrapped_read()) {
match Session::new(io, socket, token, id, &nonce, self.stats.clone(), &self.info.read()) {
Ok(s) => Some(Arc::new(Mutex::new(s))),
Err(e) => {
debug!(target: "network", "Session create error: {:?}", e);
@@ -707,7 +708,7 @@ impl Host {
fn accept(&self, io: &IoContext<NetworkIoMessage>) {
trace!(target: "network", "Accepting incoming connection");
loop {
let socket = match self.tcp_listener.locked().accept() {
let socket = match self.tcp_listener.lock().accept() {
Ok(None) => break,
Ok(Some((sock, _addr))) => sock,
Err(e) => {
@@ -722,11 +723,11 @@ impl Host {
}
fn session_writable(&self, token: StreamToken, io: &IoContext<NetworkIoMessage>) {
let session = { self.sessions.unwrapped_read().get(token).cloned() };
let session = { self.sessions.read().get(token).cloned() };
if let Some(session) = session {
let mut s = session.locked();
if let Err(e) = s.writable(io, &self.info.unwrapped_read()) {
let mut s = session.lock();
if let Err(e) = s.writable(io, &self.info.read()) {
trace!(target: "network", "Session write error: {}: {:?}", token, e);
}
if s.done() {
@@ -745,16 +746,16 @@ impl Host {
let mut ready_data: Vec<ProtocolId> = Vec::new();
let mut packet_data: Vec<(ProtocolId, PacketId, Vec<u8>)> = Vec::new();
let mut kill = false;
let session = { self.sessions.unwrapped_read().get(token).cloned() };
let session = { self.sessions.read().get(token).cloned() };
if let Some(session) = session.clone() {
let mut s = session.locked();
let mut s = session.lock();
loop {
match s.readable(io, &self.info.unwrapped_read()) {
match s.readable(io, &self.info.read()) {
Err(e) => {
trace!(target: "network", "Session read error: {}:{:?} ({:?}) {:?}", token, s.id(), s.remote_addr(), e);
if let UtilError::Network(NetworkError::Disconnect(DisconnectReason::IncompatibleProtocol)) = e {
if let Some(id) = s.id() {
self.nodes.unwrapped_write().mark_as_useless(id);
self.nodes.write().mark_as_useless(id);
}
}
kill = true;
@@ -764,9 +765,9 @@ impl Host {
self.num_sessions.fetch_add(1, AtomicOrdering::SeqCst);
if !s.info.originated {
let session_count = self.session_count();
let reserved_nodes = self.reserved_nodes.unwrapped_read();
let reserved_nodes = self.reserved_nodes.read();
let (ideal_peers, reserved_only) = {
let info = self.info.unwrapped_read();
let info = self.info.read();
(info.config.ideal_peers, info.config.non_reserved_mode == NonReservedPeerMode::Deny)
};
@@ -781,14 +782,14 @@ impl Host {
// Add it no node table
if let Ok(address) = s.remote_addr() {
let entry = NodeEntry { id: s.id().unwrap().clone(), endpoint: NodeEndpoint { address: address, udp_port: address.port() } };
self.nodes.unwrapped_write().add_node(Node::new(entry.id.clone(), entry.endpoint.clone()));
let mut discovery = self.discovery.locked();
self.nodes.write().add_node(Node::new(entry.id.clone(), entry.endpoint.clone()));
let mut discovery = self.discovery.lock();
if let Some(ref mut discovery) = *discovery.deref_mut() {
discovery.add_node(entry);
}
}
}
for (p, _) in self.handlers.unwrapped_read().iter() {
for (p, _) in self.handlers.read().iter() {
if s.have_capability(p) {
ready_data.push(p);
}
@@ -799,7 +800,7 @@ impl Host {
protocol,
packet_id,
}) => {
match self.handlers.unwrapped_read().get(protocol) {
match self.handlers.read().get(protocol) {
None => { warn!(target: "network", "No handler found for protocol: {:?}", protocol) },
Some(_) => packet_data.push((protocol, packet_id, data)),
}
@@ -812,16 +813,16 @@ impl Host {
if kill {
self.kill_connection(token, io, true);
}
let handlers = self.handlers.unwrapped_read();
let handlers = self.handlers.read();
for p in ready_data {
let h = handlers.get(p).unwrap().clone();
self.stats.inc_sessions();
let reserved = self.reserved_nodes.unwrapped_read();
let reserved = self.reserved_nodes.read();
h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token);
}
for (p, packet_id, data) in packet_data {
let h = handlers.get(p).unwrap().clone();
let reserved = self.reserved_nodes.unwrapped_read();
let reserved = self.reserved_nodes.read();
h.read(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token, packet_id, &data[1..]);
}
}
@@ -837,14 +838,14 @@ impl Host {
let mut deregister = false;
let mut expired_session = None;
if let FIRST_SESSION ... LAST_SESSION = token {
let sessions = self.sessions.unwrapped_write();
let sessions = self.sessions.write();
if let Some(session) = sessions.get(token).cloned() {
expired_session = Some(session.clone());
let mut s = session.locked();
let mut s = session.lock();
if !s.expired() {
if s.is_ready() {
self.num_sessions.fetch_sub(1, AtomicOrdering::SeqCst);
for (p, _) in self.handlers.unwrapped_read().iter() {
for (p, _) in self.handlers.read().iter() {
if s.have_capability(p) {
to_disconnect.push(p);
}
@@ -858,12 +859,12 @@ impl Host {
}
if let Some(id) = failure_id {
if remote {
self.nodes.unwrapped_write().note_failure(&id);
self.nodes.write().note_failure(&id);
}
}
for p in to_disconnect {
let h = self.handlers.unwrapped_read().get(p).unwrap().clone();
let reserved = self.reserved_nodes.unwrapped_read();
let h = self.handlers.read().get(p).unwrap().clone();
let reserved = self.reserved_nodes.read();
h.disconnected(&NetworkContext::new(io, p, expired_session.clone(), self.sessions.clone(), &reserved), &token);
}
if deregister {
@@ -874,9 +875,9 @@ impl Host {
fn update_nodes(&self, io: &IoContext<NetworkIoMessage>, node_changes: TableUpdates) {
let mut to_remove: Vec<PeerId> = Vec::new();
{
let sessions = self.sessions.unwrapped_write();
let sessions = self.sessions.write();
for c in sessions.iter() {
let s = c.locked();
let s = c.lock();
if let Some(id) = s.id() {
if node_changes.removed.contains(id) {
to_remove.push(s.token());
@@ -888,11 +889,11 @@ impl Host {
trace!(target: "network", "Removed from node table: {}", i);
self.kill_connection(i, io, false);
}
self.nodes.unwrapped_write().update(node_changes);
self.nodes.write().update(node_changes);
}
pub fn with_context<F>(&self, protocol: ProtocolId, io: &IoContext<NetworkIoMessage>, action: F) where F: Fn(&NetworkContext) {
let reserved = { self.reserved_nodes.unwrapped_read() };
let reserved = { self.reserved_nodes.read() };
let context = NetworkContext::new(io, protocol, None, self.sessions.clone(), &reserved);
action(&context);
@@ -922,7 +923,7 @@ impl IoHandler<NetworkIoMessage> for Host {
match stream {
FIRST_SESSION ... LAST_SESSION => self.session_readable(stream, io),
DISCOVERY => {
let node_changes = { self.discovery.locked().as_mut().unwrap().readable(io) };
let node_changes = { self.discovery.lock().as_mut().unwrap().readable(io) };
if let Some(node_changes) = node_changes {
self.update_nodes(io, node_changes);
}
@@ -939,7 +940,7 @@ impl IoHandler<NetworkIoMessage> for Host {
match stream {
FIRST_SESSION ... LAST_SESSION => self.session_writable(stream, io),
DISCOVERY => {
self.discovery.locked().as_mut().unwrap().writable(io);
self.discovery.lock().as_mut().unwrap().writable(io);
}
_ => panic!("Received unknown writable token"),
}
@@ -953,11 +954,11 @@ impl IoHandler<NetworkIoMessage> for Host {
IDLE => self.maintain_network(io),
FIRST_SESSION ... LAST_SESSION => self.connection_timeout(token, io),
DISCOVERY_REFRESH => {
self.discovery.locked().as_mut().unwrap().refresh();
self.discovery.lock().as_mut().unwrap().refresh();
io.update_registration(DISCOVERY).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e));
},
DISCOVERY_ROUND => {
let node_changes = { self.discovery.locked().as_mut().unwrap().round() };
let node_changes = { self.discovery.lock().as_mut().unwrap().round() };
if let Some(node_changes) = node_changes {
self.update_nodes(io, node_changes);
}
@@ -965,13 +966,13 @@ impl IoHandler<NetworkIoMessage> for Host {
},
NODE_TABLE => {
trace!(target: "network", "Refreshing node table");
self.nodes.unwrapped_write().clear_useless();
self.nodes.write().clear_useless();
},
_ => match self.timers.unwrapped_read().get(&token).cloned() {
Some(timer) => match self.handlers.unwrapped_read().get(timer.protocol).cloned() {
_ => match self.timers.read().get(&token).cloned() {
Some(timer) => match self.handlers.read().get(timer.protocol).cloned() {
None => { warn!(target: "network", "No handler found for protocol: {:?}", timer.protocol) },
Some(h) => {
let reserved = self.reserved_nodes.unwrapped_read();
let reserved = self.reserved_nodes.read();
h.timeout(&NetworkContext::new(io, timer.protocol, None, self.sessions.clone(), &reserved), timer.token);
}
},
@@ -991,10 +992,10 @@ impl IoHandler<NetworkIoMessage> for Host {
ref versions
} => {
let h = handler.clone();
let reserved = self.reserved_nodes.unwrapped_read();
let reserved = self.reserved_nodes.read();
h.initialize(&NetworkContext::new(io, protocol, None, self.sessions.clone(), &reserved));
self.handlers.unwrapped_write().insert(protocol, h);
let mut info = self.info.unwrapped_write();
self.handlers.write().insert(protocol, h);
let mut info = self.info.write();
for v in versions {
info.capabilities.push(CapabilityInfo { protocol: protocol, version: *v, packet_count:0 });
}
@@ -1005,29 +1006,29 @@ impl IoHandler<NetworkIoMessage> for Host {
ref token,
} => {
let handler_token = {
let mut timer_counter = self.timer_counter.unwrapped_write();
let mut timer_counter = self.timer_counter.write();
let counter = &mut *timer_counter;
let handler_token = *counter;
*counter += 1;
handler_token
};
self.timers.unwrapped_write().insert(handler_token, ProtocolTimer { protocol: protocol, token: *token });
self.timers.write().insert(handler_token, ProtocolTimer { protocol: protocol, token: *token });
io.register_timer(handler_token, *delay).unwrap_or_else(|e| debug!("Error registering timer {}: {:?}", token, e));
},
NetworkIoMessage::Disconnect(ref peer) => {
let session = { self.sessions.unwrapped_read().get(*peer).cloned() };
let session = { self.sessions.read().get(*peer).cloned() };
if let Some(session) = session {
session.locked().disconnect(io, DisconnectReason::DisconnectRequested);
session.lock().disconnect(io, DisconnectReason::DisconnectRequested);
}
trace!(target: "network", "Disconnect requested {}", peer);
self.kill_connection(*peer, io, false);
},
NetworkIoMessage::DisablePeer(ref peer) => {
let session = { self.sessions.unwrapped_read().get(*peer).cloned() };
let session = { self.sessions.read().get(*peer).cloned() };
if let Some(session) = session {
session.locked().disconnect(io, DisconnectReason::DisconnectRequested);
if let Some(id) = session.locked().id() {
self.nodes.unwrapped_write().mark_as_useless(id)
session.lock().disconnect(io, DisconnectReason::DisconnectRequested);
if let Some(id) = session.lock().id() {
self.nodes.write().mark_as_useless(id)
}
}
trace!(target: "network", "Disabling peer {}", peer);
@@ -1042,13 +1043,13 @@ impl IoHandler<NetworkIoMessage> for Host {
fn register_stream(&self, stream: StreamToken, reg: Token, event_loop: &mut EventLoop<IoManager<NetworkIoMessage>>) {
match stream {
FIRST_SESSION ... LAST_SESSION => {
let session = { self.sessions.unwrapped_read().get(stream).cloned() };
let session = { self.sessions.read().get(stream).cloned() };
if let Some(session) = session {
session.locked().register_socket(reg, event_loop).expect("Error registering socket");
session.lock().register_socket(reg, event_loop).expect("Error registering socket");
}
}
DISCOVERY => self.discovery.locked().as_ref().unwrap().register_socket(event_loop).expect("Error registering discovery socket"),
TCP_ACCEPT => event_loop.register(&*self.tcp_listener.locked(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"),
DISCOVERY => self.discovery.lock().as_ref().unwrap().register_socket(event_loop).expect("Error registering discovery socket"),
TCP_ACCEPT => event_loop.register(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"),
_ => warn!("Unexpected stream registration")
}
}
@@ -1056,9 +1057,9 @@ impl IoHandler<NetworkIoMessage> for Host {
fn deregister_stream(&self, stream: StreamToken, event_loop: &mut EventLoop<IoManager<NetworkIoMessage>>) {
match stream {
FIRST_SESSION ... LAST_SESSION => {
let mut connections = self.sessions.unwrapped_write();
let mut connections = self.sessions.write();
if let Some(connection) = connections.get(stream).cloned() {
connection.locked().deregister_socket(event_loop).expect("Error deregistering socket");
connection.lock().deregister_socket(event_loop).expect("Error deregistering socket");
connections.remove(stream);
}
}
@@ -1070,13 +1071,13 @@ impl IoHandler<NetworkIoMessage> for Host {
fn update_stream(&self, stream: StreamToken, reg: Token, event_loop: &mut EventLoop<IoManager<NetworkIoMessage>>) {
match stream {
FIRST_SESSION ... LAST_SESSION => {
let connection = { self.sessions.unwrapped_read().get(stream).cloned() };
let connection = { self.sessions.read().get(stream).cloned() };
if let Some(connection) = connection {
connection.locked().update_socket(reg, event_loop).expect("Error updating socket");
connection.lock().update_socket(reg, event_loop).expect("Error updating socket");
}
}
DISCOVERY => self.discovery.locked().as_ref().unwrap().update_registration(event_loop).expect("Error reregistering discovery socket"),
TCP_ACCEPT => event_loop.reregister(&*self.tcp_listener.locked(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"),
DISCOVERY => self.discovery.lock().as_ref().unwrap().update_registration(event_loop).expect("Error reregistering discovery socket"),
TCP_ACCEPT => event_loop.reregister(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"),
_ => warn!("Unexpected stream update")
}
}

View File

@@ -14,16 +14,18 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::sync::*;
use error::*;
use panics::*;
use misc::RwLockable;
use network::{NetworkProtocolHandler, NetworkConfiguration};
use network::error::NetworkError;
use network::host::{Host, NetworkContext, NetworkIoMessage, ProtocolId};
use network::stats::NetworkStats;
use io::*;
use std::sync::Arc;
use parking_lot::RwLock;
/// IO Service with networking
/// `Message` defines a notification data type.
pub struct NetworkService {
@@ -86,19 +88,19 @@ impl NetworkService {
/// Returns external url if available.
pub fn external_url(&self) -> Option<String> {
let host = self.host.unwrapped_read();
let host = self.host.read();
host.as_ref().and_then(|h| h.external_url())
}
/// Returns external url if available.
pub fn local_url(&self) -> Option<String> {
let host = self.host.unwrapped_read();
let host = self.host.read();
host.as_ref().map(|h| h.local_url())
}
/// Start network IO
pub fn start(&self) -> Result<(), UtilError> {
let mut host = self.host.unwrapped_write();
let mut host = self.host.write();
if host.is_none() {
let h = Arc::new(try!(Host::new(self.config.clone(), self.stats.clone())));
try!(self.io_service.register_handler(h.clone()));
@@ -109,7 +111,7 @@ impl NetworkService {
/// Stop network IO
pub fn stop(&self) -> Result<(), UtilError> {
let mut host = self.host.unwrapped_write();
let mut host = self.host.write();
if let Some(ref host) = *host {
let io = IoContext::new(self.io_service.channel(), 0); //TODO: take token id from host
try!(host.stop(&io));
@@ -120,7 +122,7 @@ impl NetworkService {
/// Try to add a reserved peer.
pub fn add_reserved_peer(&self, peer: &str) -> Result<(), UtilError> {
let host = self.host.unwrapped_read();
let host = self.host.read();
if let Some(ref host) = *host {
host.add_reserved_node(peer)
} else {
@@ -130,7 +132,7 @@ impl NetworkService {
/// Try to remove a reserved peer.
pub fn remove_reserved_peer(&self, peer: &str) -> Result<(), UtilError> {
let host = self.host.unwrapped_read();
let host = self.host.read();
if let Some(ref host) = *host {
host.remove_reserved_node(peer)
} else {
@@ -140,7 +142,7 @@ impl NetworkService {
/// Set the non-reserved peer mode.
pub fn set_non_reserved_mode(&self, mode: ::network::NonReservedPeerMode) {
let host = self.host.unwrapped_read();
let host = self.host.read();
if let Some(ref host) = *host {
let io_ctxt = IoContext::new(self.io_service.channel(), 0);
host.set_non_reserved_mode(mode, &io_ctxt);
@@ -150,7 +152,7 @@ impl NetworkService {
/// Executes action in the network context
pub fn with_context<F>(&self, protocol: ProtocolId, action: F) where F: Fn(&NetworkContext) {
let io = IoContext::new(self.io_service.channel(), 0);
let host = self.host.unwrapped_read();
let host = self.host.read();
if let Some(ref host) = host.as_ref() {
host.with_context(protocol, &io, action);
};

View File

@@ -18,7 +18,6 @@ use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use std::thread;
use std::time::*;
use common::*;
use misc::*;
use network::*;
use io::TimerToken;
use crypto::KeyPair;
@@ -47,7 +46,7 @@ impl TestProtocol {
}
pub fn got_packet(&self) -> bool {
self.packet.locked().deref()[..] == b"hello"[..]
self.packet.lock().deref()[..] == b"hello"[..]
}
pub fn got_timeout(&self) -> bool {
@@ -66,7 +65,7 @@ impl NetworkProtocolHandler for TestProtocol {
fn read(&self, _io: &NetworkContext, _peer: &PeerId, packet_id: u8, data: &[u8]) {
assert_eq!(packet_id, 33);
self.packet.locked().extend(data);
self.packet.lock().extend(data);
}
fn connected(&self, io: &NetworkContext, peer: &PeerId) {

View File

@@ -18,9 +18,10 @@
use std::thread;
use std::ops::DerefMut;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::default::Default;
use misc::Lockable;
use parking_lot::Mutex;
/// Thread-safe closure for handling possible panics
pub trait OnPanicListener: Send + Sync + 'static {
@@ -89,7 +90,7 @@ impl PanicHandler {
/// Notifies all listeners in case there is a panic.
/// You should use `catch_panic` instead of calling this method explicitly.
pub fn notify_all(&self, r: String) {
let mut listeners = self.listeners.locked();
let mut listeners = self.listeners.lock();
for listener in listeners.deref_mut() {
listener.call(&r);
}
@@ -98,7 +99,7 @@ impl PanicHandler {
impl MayPanic for PanicHandler {
fn on_panic<F>(&self, closure: F) where F: OnPanicListener {
self.listeners.locked().push(Box::new(closure));
self.listeners.lock().push(Box::new(closure));
}
}
@@ -119,50 +120,46 @@ impl<F> OnPanicListener for F
#[test]
#[ignore] // panic forwarding doesnt work on the same thread in beta
fn should_notify_listeners_about_panic () {
use std::sync::RwLock;
use misc::RwLockable;
use parking_lot::RwLock;
// given
let invocations = Arc::new(RwLock::new(vec![]));
let i = invocations.clone();
let p = PanicHandler::new();
p.on_panic(move |t| i.unwrapped_write().push(t));
p.on_panic(move |t| i.write().push(t));
// when
p.catch_panic(|| panic!("Panic!")).unwrap_err();
// then
assert!(invocations.unwrapped_read()[0] == "Panic!");
assert!(invocations.read()[0] == "Panic!");
}
#[test]
#[ignore] // panic forwarding doesnt work on the same thread in beta
fn should_notify_listeners_about_panic_when_string_is_dynamic () {
use std::sync::RwLock;
use misc::RwLockable;
use parking_lot::RwLock;
// given
let invocations = Arc::new(RwLock::new(vec![]));
let i = invocations.clone();
let p = PanicHandler::new();
p.on_panic(move |t| i.unwrapped_write().push(t));
p.on_panic(move |t| i.write().push(t));
// when
p.catch_panic(|| panic!("Panic: {}", 1)).unwrap_err();
// then
assert!(invocations.unwrapped_read()[0] == "Panic: 1");
assert!(invocations.read()[0] == "Panic: 1");
}
#[test]
fn should_notify_listeners_about_panic_in_other_thread () {
use std::thread;
use std::sync::RwLock;
use misc::RwLockable;
use parking_lot::RwLock;
// given
let invocations = Arc::new(RwLock::new(vec![]));
let i = invocations.clone();
let p = PanicHandler::new();
p.on_panic(move |t| i.unwrapped_write().push(t));
p.on_panic(move |t| i.write().push(t));
// when
let t = thread::spawn(move ||
@@ -171,20 +168,18 @@ fn should_notify_listeners_about_panic_in_other_thread () {
t.join().unwrap_err();
// then
assert!(invocations.unwrapped_read()[0] == "Panic!");
assert!(invocations.read()[0] == "Panic!");
}
#[test]
#[ignore] // panic forwarding doesnt work on the same thread in beta
fn should_forward_panics () {
use std::sync::RwLock;
use misc::RwLockable;
use parking_lot::RwLock;
// given
let invocations = Arc::new(RwLock::new(vec![]));
let i = invocations.clone();
let p = PanicHandler::new_in_arc();
p.on_panic(move |t| i.unwrapped_write().push(t));
p.on_panic(move |t| i.write().push(t));
let p2 = PanicHandler::new();
p.forward_from(&p2);
@@ -193,5 +188,5 @@ use std::sync::RwLock;
p2.catch_panic(|| panic!("Panic!")).unwrap_err();
// then
assert!(invocations.unwrapped_read()[0] == "Panic!");
assert!(invocations.read()[0] == "Panic!");
}

View File

@@ -36,7 +36,7 @@ pub use std::error::Error as StdError;
pub use std::ops::*;
pub use std::cmp::*;
pub use std::sync::*;
pub use std::sync::Arc;
pub use std::cell::*;
pub use std::collections::*;
@@ -46,3 +46,5 @@ pub use rustc_serialize::hex::{FromHex, FromHexError};
pub use heapsize::HeapSizeOf;
pub use itertools::Itertools;
pub use parking_lot::{Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};