Merge branches 'kill_unwraps' and 'dynamic-gas-price' of github.com:ethcore/parity into dynamic-gas-price

This commit is contained in:
Gav Wood
2016-07-09 12:29:06 +02:00
40 changed files with 503 additions and 460 deletions

View File

@@ -21,6 +21,7 @@ use mio::*;
use crossbeam::sync::chase_lev;
use slab::Slab;
use error::*;
use misc::*;
use io::{IoError, IoHandler};
use io::worker::{Worker, Work, WorkType};
use panics::*;
@@ -227,7 +228,7 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
let handler_index = token.as_usize() / TOKENS_PER_HANDLER;
let token_id = token.as_usize() % TOKENS_PER_HANDLER;
if let Some(handler) = self.handlers.get(handler_index) {
if let Some(timer) = self.timers.read().unwrap().get(&token.as_usize()) {
if let Some(timer) = self.timers.unwrapped_read().get(&token.as_usize()) {
event_loop.timeout_ms(token, timer.delay).expect("Error re-registering user timer");
self.worker_channel.push(Work { work_type: WorkType::Timeout, token: token_id, handler: handler.clone(), handler_id: handler_index });
self.work_ready.notify_all();
@@ -249,7 +250,7 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
// TODO: flush event loop
self.handlers.remove(handler_id);
// unregister timers
let mut timers = self.timers.write().unwrap();
let mut timers = self.timers.unwrapped_write();
let to_remove: Vec<_> = timers.keys().cloned().filter(|timer_id| timer_id / TOKENS_PER_HANDLER == handler_id).collect();
for timer_id in to_remove {
let timer = timers.remove(&timer_id).expect("to_remove only contains keys from timers; qed");
@@ -259,11 +260,11 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
IoMessage::AddTimer { handler_id, token, delay } => {
let timer_id = token + handler_id * TOKENS_PER_HANDLER;
let timeout = event_loop.timeout_ms(Token(timer_id), delay).expect("Error registering user timer");
self.timers.write().unwrap().insert(timer_id, UserTimer { delay: delay, timeout: timeout });
self.timers.unwrapped_write().insert(timer_id, UserTimer { delay: delay, timeout: timeout });
},
IoMessage::RemoveTimer { handler_id, token } => {
let timer_id = token + handler_id * TOKENS_PER_HANDLER;
if let Some(timer) = self.timers.write().unwrap().remove(&timer_id) {
if let Some(timer) = self.timers.unwrapped_write().remove(&timer_id) {
event_loop.clear_timeout(timer.timeout);
}
},
@@ -277,7 +278,7 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
handler.deregister_stream(token, event_loop);
// unregister a timer associated with the token (if any)
let timer_id = token + handler_id * TOKENS_PER_HANDLER;
if let Some(timer) = self.timers.write().unwrap().remove(&timer_id) {
if let Some(timer) = self.timers.unwrapped_write().remove(&timer_id) {
event_loop.clear_timeout(timer.timeout);
}
}

View File

@@ -22,6 +22,7 @@ use crossbeam::sync::chase_lev;
use io::service::{HandlerId, IoChannel, IoContext};
use io::{IoHandler};
use panics::*;
use misc::Lockable;
pub enum WorkType<Message> {
Readable,
@@ -81,7 +82,7 @@ impl Worker {
where Message: Send + Sync + Clone + 'static {
loop {
{
let lock = wait_mutex.lock().unwrap();
let lock = wait_mutex.locked();
if deleting.load(AtomicOrdering::Acquire) {
return;
}

View File

@@ -20,6 +20,7 @@ use common::*;
use rlp::*;
use hashdb::*;
use memorydb::*;
use misc::RwLockable;
use super::{DB_PREFIX_LEN, LATEST_ERA_KEY, VERSION_KEY};
use super::traits::JournalDB;
use kvdb::{Database, DBTransaction, DatabaseConfig};
@@ -225,7 +226,7 @@ impl EarlyMergeDB {
#[cfg(test)]
fn can_reconstruct_refs(&self) -> bool {
let (latest_era, reconstructed) = Self::read_refs(&self.backing);
let refs = self.refs.as_ref().unwrap().write().unwrap();
let refs = self.refs.as_ref().unwrap().unwrapped_write();
if *refs != reconstructed || latest_era != self.latest_era {
let clean_refs = refs.iter().filter_map(|(k, v)| if reconstructed.get(k) == Some(v) {None} else {Some((k.clone(), v.clone()))}).collect::<HashMap<_, _>>();
let clean_recon = reconstructed.into_iter().filter_map(|(k, v)| if refs.get(&k) == Some(&v) {None} else {Some((k.clone(), v.clone()))}).collect::<HashMap<_, _>>();
@@ -333,7 +334,7 @@ impl JournalDB for EarlyMergeDB {
fn mem_used(&self) -> usize {
self.overlay.mem_used() + match self.refs {
Some(ref c) => c.read().unwrap().heap_size_of_children(),
Some(ref c) => c.unwrapped_read().heap_size_of_children(),
None => 0
}
}
@@ -385,7 +386,7 @@ impl JournalDB for EarlyMergeDB {
//
// record new commit's details.
let mut refs = self.refs.as_ref().unwrap().write().unwrap();
let mut refs = self.refs.as_ref().unwrap().unwrapped_write();
let batch = DBTransaction::new();
let trace = false;
{

View File

@@ -20,6 +20,7 @@ use common::*;
use rlp::*;
use hashdb::*;
use memorydb::*;
use misc::RwLockable;
use super::{DB_PREFIX_LEN, LATEST_ERA_KEY, VERSION_KEY};
use kvdb::{Database, DBTransaction, DatabaseConfig};
#[cfg(test)]
@@ -136,7 +137,7 @@ impl OverlayRecentDB {
#[cfg(test)]
fn can_reconstruct_refs(&self) -> bool {
let reconstructed = Self::read_overlay(&self.backing);
let journal_overlay = self.journal_overlay.read().unwrap();
let journal_overlay = self.journal_overlay.unwrapped_read();
*journal_overlay == reconstructed
}
@@ -199,7 +200,7 @@ impl JournalDB for OverlayRecentDB {
fn mem_used(&self) -> usize {
let mut mem = self.transaction_overlay.mem_used();
let overlay = self.journal_overlay.read().unwrap();
let overlay = self.journal_overlay.unwrapped_read();
mem += overlay.backing_overlay.mem_used();
mem += overlay.journal.heap_size_of_children();
mem
@@ -209,12 +210,12 @@ impl JournalDB for OverlayRecentDB {
self.backing.get(&LATEST_ERA_KEY).expect("Low level database error").is_none()
}
fn latest_era(&self) -> Option<u64> { self.journal_overlay.read().unwrap().latest_era }
fn latest_era(&self) -> Option<u64> { self.journal_overlay.unwrapped_read().latest_era }
fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
// record new commit's details.
trace!("commit: #{} ({}), end era: {:?}", now, id, end);
let mut journal_overlay = self.journal_overlay.write().unwrap();
let mut journal_overlay = self.journal_overlay.unwrapped_write();
let batch = DBTransaction::new();
{
let mut r = RlpStream::new_list(3);
@@ -321,7 +322,7 @@ impl HashDB for OverlayRecentDB {
match k {
Some(&(ref d, rc)) if rc > 0 => Some(d),
_ => {
let v = self.journal_overlay.read().unwrap().backing_overlay.get(key).map(|v| v.to_vec());
let v = self.journal_overlay.unwrapped_read().backing_overlay.get(key).map(|v| v.to_vec());
match v {
Some(x) => {
Some(&self.transaction_overlay.denote(key, x).0)

View File

@@ -23,6 +23,7 @@ use env_logger::LogBuilder;
use std::sync::{RwLock, RwLockReadGuard};
use std::sync::atomic::{Ordering, AtomicBool};
use arrayvec::ArrayVec;
use misc::RwLockable;
pub use ansi_term::{Colour, Style};
lazy_static! {
@@ -98,7 +99,7 @@ impl RotatingLogger {
/// Append new log entry
pub fn append(&self, log: String) {
self.logs.write().unwrap().insert(0, log);
self.logs.unwrapped_write().insert(0, log);
}
/// Return levels
@@ -108,7 +109,7 @@ impl RotatingLogger {
/// Return logs
pub fn logs(&self) -> RwLockReadGuard<ArrayVec<[String; LOG_SIZE]>> {
self.logs.read().unwrap()
self.logs.unwrapped_read()
}
}

View File

@@ -65,3 +65,28 @@ pub fn version_data() -> Bytes {
s.append(&&Target::os()[0..2]);
s.out()
}
/// Object can be locked directly into a `MutexGuard`.
pub trait Lockable<T> {
/// Lock object directly into a `MutexGuard`.
fn locked(&self) -> MutexGuard<T>;
}
impl<T> Lockable<T> for Mutex<T> {
fn locked(&self) -> MutexGuard<T> { self.lock().unwrap() }
}
/// Object can be read or write locked directly into a guard.
pub trait RwLockable<T> {
/// Read-lock object directly into a `ReadGuard`.
fn unwrapped_read(&self) -> RwLockReadGuard<T>;
/// Write-lock object directly into a `WriteGuard`.
fn unwrapped_write(&self) -> RwLockWriteGuard<T>;
}
impl<T> RwLockable<T> for RwLock<T> {
fn unwrapped_read(&self) -> RwLockReadGuard<T> { self.read().unwrap() }
fn unwrapped_write(&self) -> RwLockWriteGuard<T> { self.write().unwrap() }
}

View File

@@ -28,7 +28,7 @@ use std::fs;
use mio::*;
use mio::tcp::*;
use hash::*;
use misc::version;
use misc::*;
use crypto::*;
use sha3::Hashable;
use rlp::*;
@@ -202,7 +202,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
protocol: ProtocolId,
session: Option<SharedSession>, sessions: Arc<RwLock<Slab<SharedSession>>>,
reserved_peers: &'s HashSet<NodeId>) -> NetworkContext<'s, Message> {
let id = session.as_ref().map(|s| s.lock().unwrap().token());
let id = session.as_ref().map(|s| s.locked().token());
NetworkContext {
io: io,
protocol: protocol,
@@ -216,7 +216,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
fn resolve_session(&self, peer: PeerId) -> Option<SharedSession> {
match self.session_id {
Some(id) if id == peer => self.session.clone(),
_ => self.sessions.read().unwrap().get(peer).cloned(),
_ => self.sessions.unwrapped_read().get(peer).cloned(),
}
}
@@ -224,7 +224,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
pub fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError> {
let session = self.resolve_session(peer);
if let Some(session) = session {
try!(session.lock().unwrap().send_packet(self.io, self.protocol, packet_id as u8, &data));
try!(session.locked().send_packet(self.io, self.protocol, packet_id as u8, &data));
} else {
trace!(target: "network", "Send: Peer no longer exist")
}
@@ -262,7 +262,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
/// Check if the session is still active.
pub fn is_expired(&self) -> bool {
self.session.as_ref().map_or(false, |s| s.lock().unwrap().expired())
self.session.as_ref().map_or(false, |s| s.locked().expired())
}
/// Register a new IO timer. 'IoHandler::timeout' will be called with the token.
@@ -279,7 +279,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
pub fn peer_info(&self, peer: PeerId) -> String {
let session = self.resolve_session(peer);
if let Some(session) = session {
return session.lock().unwrap().info.client_version.clone()
return session.locked().info.client_version.clone()
}
"unknown".to_owned()
}
@@ -422,8 +422,8 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
Ok(n) => {
let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() };
self.nodes.write().unwrap().add_node(n);
if let Some(ref mut discovery) = *self.discovery.lock().unwrap() {
self.nodes.unwrapped_write().add_node(n);
if let Some(ref mut discovery) = *self.discovery.locked() {
discovery.add_node(entry);
}
}
@@ -434,9 +434,9 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
let n = try!(Node::from_str(id));
let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() };
self.reserved_nodes.write().unwrap().insert(n.id.clone());
self.reserved_nodes.unwrapped_write().insert(n.id.clone());
if let Some(ref mut discovery) = *self.discovery.lock().unwrap() {
if let Some(ref mut discovery) = *self.discovery.locked() {
discovery.add_node(entry);
}
@@ -444,17 +444,17 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}
pub fn set_non_reserved_mode(&self, mode: NonReservedPeerMode, io: &IoContext<NetworkIoMessage<Message>>) {
let mut info = self.info.write().unwrap();
let mut info = self.info.unwrapped_write();
if info.config.non_reserved_mode != mode {
info.config.non_reserved_mode = mode.clone();
drop(info);
if let NonReservedPeerMode::Deny = mode {
// disconnect all non-reserved peers here.
let reserved: HashSet<NodeId> = self.reserved_nodes.read().unwrap().clone();
let reserved: HashSet<NodeId> = self.reserved_nodes.unwrapped_read().clone();
let mut to_kill = Vec::new();
for e in self.sessions.write().unwrap().iter_mut() {
let mut s = e.lock().unwrap();
for e in self.sessions.unwrapped_write().iter_mut() {
let mut s = e.locked();
{
let id = s.id();
if id.is_some() && reserved.contains(id.unwrap()) {
@@ -475,7 +475,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
pub fn remove_reserved_node(&self, id: &str) -> Result<(), UtilError> {
let n = try!(Node::from_str(id));
self.reserved_nodes.write().unwrap().remove(&n.id);
self.reserved_nodes.unwrapped_write().remove(&n.id);
Ok(())
}
@@ -485,11 +485,11 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}
pub fn external_url(&self) -> Option<String> {
self.info.read().unwrap().public_endpoint.as_ref().map(|e| format!("{}", Node::new(self.info.read().unwrap().id().clone(), e.clone())))
self.info.unwrapped_read().public_endpoint.as_ref().map(|e| format!("{}", Node::new(self.info.unwrapped_read().id().clone(), e.clone())))
}
pub fn local_url(&self) -> String {
let r = format!("{}", Node::new(self.info.read().unwrap().id().clone(), self.info.read().unwrap().local_endpoint.clone()));
let r = format!("{}", Node::new(self.info.unwrapped_read().id().clone(), self.info.unwrapped_read().local_endpoint.clone()));
println!("{}", r);
r
}
@@ -497,8 +497,8 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
pub fn stop(&self, io: &IoContext<NetworkIoMessage<Message>>) -> Result<(), UtilError> {
self.stopping.store(true, AtomicOrdering::Release);
let mut to_kill = Vec::new();
for e in self.sessions.write().unwrap().iter_mut() {
let mut s = e.lock().unwrap();
for e in self.sessions.unwrapped_write().iter_mut() {
let mut s = e.locked();
s.disconnect(io, DisconnectReason::ClientQuit);
to_kill.push(s.token());
}
@@ -512,16 +512,16 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
fn init_public_interface(&self, io: &IoContext<NetworkIoMessage<Message>>) -> Result<(), UtilError> {
io.clear_timer(INIT_PUBLIC).unwrap();
if self.info.read().unwrap().public_endpoint.is_some() {
if self.info.unwrapped_read().public_endpoint.is_some() {
return Ok(());
}
let local_endpoint = self.info.read().unwrap().local_endpoint.clone();
let public_address = self.info.read().unwrap().config.public_address.clone();
let local_endpoint = self.info.unwrapped_read().local_endpoint.clone();
let public_address = self.info.unwrapped_read().config.public_address.clone();
let public_endpoint = match public_address {
None => {
let public_address = select_public_address(local_endpoint.address.port());
let public_endpoint = NodeEndpoint { address: public_address, udp_port: local_endpoint.udp_port };
if self.info.read().unwrap().config.nat_enabled {
if self.info.unwrapped_read().config.nat_enabled {
match map_external_address(&local_endpoint) {
Some(endpoint) => {
info!("NAT mapped to external address {}", endpoint.address);
@@ -536,7 +536,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
Some(addr) => NodeEndpoint { address: addr, udp_port: local_endpoint.udp_port }
};
self.info.write().unwrap().public_endpoint = Some(public_endpoint.clone());
self.info.unwrapped_write().public_endpoint = Some(public_endpoint.clone());
if let Some(url) = self.external_url() {
io.message(NetworkIoMessage::NetworkStarted(url)).unwrap_or_else(|e| warn!("Error sending IO notification: {:?}", e));
@@ -544,18 +544,18 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
// Initialize discovery.
let discovery = {
let info = self.info.read().unwrap();
let info = self.info.unwrapped_read();
if info.config.discovery_enabled && info.config.non_reserved_mode == NonReservedPeerMode::Accept {
Some(Discovery::new(&info.keys, public_endpoint.address.clone(), public_endpoint, DISCOVERY))
} else { None }
};
if let Some(mut discovery) = discovery {
discovery.init_node_list(self.nodes.read().unwrap().unordered_entries());
for n in self.nodes.read().unwrap().unordered_entries() {
discovery.init_node_list(self.nodes.unwrapped_read().unordered_entries());
for n in self.nodes.unwrapped_read().unordered_entries() {
discovery.add_node(n.clone());
}
*self.discovery.lock().unwrap() = Some(discovery);
*self.discovery.locked() = Some(discovery);
io.register_stream(DISCOVERY).expect("Error registering UDP listener");
io.register_timer(DISCOVERY_REFRESH, 7200).expect("Error registering discovery timer");
io.register_timer(DISCOVERY_ROUND, 300).expect("Error registering discovery timer");
@@ -571,7 +571,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}
fn have_session(&self, id: &NodeId) -> bool {
self.sessions.read().unwrap().iter().any(|e| e.lock().unwrap().info.id == Some(id.clone()))
self.sessions.unwrapped_read().iter().any(|e| e.locked().info.id == Some(id.clone()))
}
fn session_count(&self) -> usize {
@@ -579,17 +579,17 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}
fn connecting_to(&self, id: &NodeId) -> bool {
self.sessions.read().unwrap().iter().any(|e| e.lock().unwrap().id() == Some(id))
self.sessions.unwrapped_read().iter().any(|e| e.locked().id() == Some(id))
}
fn handshake_count(&self) -> usize {
self.sessions.read().unwrap().count() - self.session_count()
self.sessions.unwrapped_read().count() - self.session_count()
}
fn keep_alive(&self, io: &IoContext<NetworkIoMessage<Message>>) {
let mut to_kill = Vec::new();
for e in self.sessions.write().unwrap().iter_mut() {
let mut s = e.lock().unwrap();
for e in self.sessions.unwrapped_write().iter_mut() {
let mut s = e.locked();
if !s.keep_alive(io) {
s.disconnect(io, DisconnectReason::PingTimeout);
to_kill.push(s.token());
@@ -603,7 +603,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
fn connect_peers(&self, io: &IoContext<NetworkIoMessage<Message>>) {
let (ideal_peers, mut pin) = {
let info = self.info.read().unwrap();
let info = self.info.unwrapped_read();
if info.capabilities.is_empty() {
return;
}
@@ -613,7 +613,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
};
let session_count = self.session_count();
let reserved_nodes = self.reserved_nodes.read().unwrap();
let reserved_nodes = self.reserved_nodes.unwrapped_read();
if session_count >= ideal_peers as usize + reserved_nodes.len() {
// check if all pinned nodes are connected.
if reserved_nodes.iter().all(|n| self.have_session(n) && self.connecting_to(n)) {
@@ -634,7 +634,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
// iterate over all nodes, reserved ones coming first.
// if we are pinned to only reserved nodes, ignore all others.
let nodes = reserved_nodes.iter().cloned().chain(if !pin {
self.nodes.read().unwrap().nodes()
self.nodes.unwrapped_read().nodes()
} else {
Vec::new()
});
@@ -662,7 +662,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
let socket = {
let address = {
let mut nodes = self.nodes.write().unwrap();
let mut nodes = self.nodes.unwrapped_write();
if let Some(node) = nodes.get_mut(id) {
node.last_attempted = Some(::time::now());
node.endpoint.address
@@ -687,10 +687,10 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
#[cfg_attr(feature="dev", allow(block_in_if_condition_stmt))]
fn create_connection(&self, socket: TcpStream, id: Option<&NodeId>, io: &IoContext<NetworkIoMessage<Message>>) -> Result<(), UtilError> {
let nonce = self.info.write().unwrap().next_nonce();
let mut sessions = self.sessions.write().unwrap();
let nonce = self.info.unwrapped_write().next_nonce();
let mut sessions = self.sessions.unwrapped_write();
let token = sessions.insert_with_opt(|token| {
match Session::new(io, socket, token, id, &nonce, self.stats.clone(), &self.info.read().unwrap()) {
match Session::new(io, socket, token, id, &nonce, self.stats.clone(), &self.info.unwrapped_read()) {
Ok(s) => Some(Arc::new(Mutex::new(s))),
Err(e) => {
debug!(target: "network", "Session create error: {:?}", e);
@@ -711,7 +711,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
fn accept(&self, io: &IoContext<NetworkIoMessage<Message>>) {
trace!(target: "network", "Accepting incoming connection");
loop {
let socket = match self.tcp_listener.lock().unwrap().accept() {
let socket = match self.tcp_listener.locked().accept() {
Ok(None) => break,
Ok(Some((sock, _addr))) => sock,
Err(e) => {
@@ -726,10 +726,10 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}
fn session_writable(&self, token: StreamToken, io: &IoContext<NetworkIoMessage<Message>>) {
let session = { self.sessions.read().unwrap().get(token).cloned() };
let session = { self.sessions.unwrapped_read().get(token).cloned() };
if let Some(session) = session {
let mut s = session.lock().unwrap();
if let Err(e) = s.writable(io, &self.info.read().unwrap()) {
let mut s = session.locked();
if let Err(e) = s.writable(io, &self.info.unwrapped_read()) {
trace!(target: "network", "Session write error: {}: {:?}", token, e);
}
if s.done() {
@@ -748,16 +748,16 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
let mut ready_data: Vec<ProtocolId> = Vec::new();
let mut packet_data: Vec<(ProtocolId, PacketId, Vec<u8>)> = Vec::new();
let mut kill = false;
let session = { self.sessions.read().unwrap().get(token).cloned() };
let session = { self.sessions.unwrapped_read().get(token).cloned() };
if let Some(session) = session.clone() {
let mut s = session.lock().unwrap();
let mut s = session.locked();
loop {
match s.readable(io, &self.info.read().unwrap()) {
match s.readable(io, &self.info.unwrapped_read()) {
Err(e) => {
trace!(target: "network", "Session read error: {}:{:?} ({:?}) {:?}", token, s.id(), s.remote_addr(), e);
if let UtilError::Network(NetworkError::Disconnect(DisconnectReason::IncompatibleProtocol)) = e {
if let Some(id) = s.id() {
self.nodes.write().unwrap().mark_as_useless(id);
self.nodes.unwrapped_write().mark_as_useless(id);
}
}
kill = true;
@@ -767,9 +767,9 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
self.num_sessions.fetch_add(1, AtomicOrdering::SeqCst);
if !s.info.originated {
let session_count = self.session_count();
let reserved_nodes = self.reserved_nodes.read().unwrap();
let reserved_nodes = self.reserved_nodes.unwrapped_read();
let (ideal_peers, reserved_only) = {
let info = self.info.read().unwrap();
let info = self.info.unwrapped_read();
(info.config.ideal_peers, info.config.non_reserved_mode == NonReservedPeerMode::Deny)
};
@@ -784,14 +784,14 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
// Add it no node table
if let Ok(address) = s.remote_addr() {
let entry = NodeEntry { id: s.id().unwrap().clone(), endpoint: NodeEndpoint { address: address, udp_port: address.port() } };
self.nodes.write().unwrap().add_node(Node::new(entry.id.clone(), entry.endpoint.clone()));
let mut discovery = self.discovery.lock().unwrap();
self.nodes.unwrapped_write().add_node(Node::new(entry.id.clone(), entry.endpoint.clone()));
let mut discovery = self.discovery.locked();
if let Some(ref mut discovery) = *discovery.deref_mut() {
discovery.add_node(entry);
}
}
}
for (p, _) in self.handlers.read().unwrap().iter() {
for (p, _) in self.handlers.unwrapped_read().iter() {
if s.have_capability(p) {
ready_data.push(p);
}
@@ -802,7 +802,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
protocol,
packet_id,
}) => {
match self.handlers.read().unwrap().get(protocol) {
match self.handlers.unwrapped_read().get(protocol) {
None => { warn!(target: "network", "No handler found for protocol: {:?}", protocol) },
Some(_) => packet_data.push((protocol, packet_id, data)),
}
@@ -815,16 +815,16 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
if kill {
self.kill_connection(token, io, true);
}
let handlers = self.handlers.read().unwrap();
let handlers = self.handlers.unwrapped_read();
for p in ready_data {
let h = handlers.get(p).unwrap().clone();
self.stats.inc_sessions();
let reserved = self.reserved_nodes.read().unwrap();
let reserved = self.reserved_nodes.unwrapped_read();
h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token);
}
for (p, packet_id, data) in packet_data {
let h = handlers.get(p).unwrap().clone();
let reserved = self.reserved_nodes.read().unwrap();
let reserved = self.reserved_nodes.unwrapped_read();
h.read(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token, packet_id, &data[1..]);
}
}
@@ -840,14 +840,14 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
let mut deregister = false;
let mut expired_session = None;
if let FIRST_SESSION ... LAST_SESSION = token {
let sessions = self.sessions.write().unwrap();
let sessions = self.sessions.unwrapped_write();
if let Some(session) = sessions.get(token).cloned() {
expired_session = Some(session.clone());
let mut s = session.lock().unwrap();
let mut s = session.locked();
if !s.expired() {
if s.is_ready() {
self.num_sessions.fetch_sub(1, AtomicOrdering::SeqCst);
for (p, _) in self.handlers.read().unwrap().iter() {
for (p, _) in self.handlers.unwrapped_read().iter() {
if s.have_capability(p) {
to_disconnect.push(p);
}
@@ -861,12 +861,12 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}
if let Some(id) = failure_id {
if remote {
self.nodes.write().unwrap().note_failure(&id);
self.nodes.unwrapped_write().note_failure(&id);
}
}
for p in to_disconnect {
let h = self.handlers.read().unwrap().get(p).unwrap().clone();
let reserved = self.reserved_nodes.read().unwrap();
let h = self.handlers.unwrapped_read().get(p).unwrap().clone();
let reserved = self.reserved_nodes.unwrapped_read();
h.disconnected(&NetworkContext::new(io, p, expired_session.clone(), self.sessions.clone(), &reserved), &token);
}
if deregister {
@@ -877,9 +877,9 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
fn update_nodes(&self, io: &IoContext<NetworkIoMessage<Message>>, node_changes: TableUpdates) {
let mut to_remove: Vec<PeerId> = Vec::new();
{
let sessions = self.sessions.write().unwrap();
let sessions = self.sessions.unwrapped_write();
for c in sessions.iter() {
let s = c.lock().unwrap();
let s = c.locked();
if let Some(id) = s.id() {
if node_changes.removed.contains(id) {
to_remove.push(s.token());
@@ -891,7 +891,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
trace!(target: "network", "Removed from node table: {}", i);
self.kill_connection(i, io, false);
}
self.nodes.write().unwrap().update(node_changes);
self.nodes.unwrapped_write().update(node_changes);
}
}
@@ -918,7 +918,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
match stream {
FIRST_SESSION ... LAST_SESSION => self.session_readable(stream, io),
DISCOVERY => {
let node_changes = { self.discovery.lock().unwrap().as_mut().unwrap().readable(io) };
let node_changes = { self.discovery.locked().as_mut().unwrap().readable(io) };
if let Some(node_changes) = node_changes {
self.update_nodes(io, node_changes);
}
@@ -935,7 +935,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
match stream {
FIRST_SESSION ... LAST_SESSION => self.session_writable(stream, io),
DISCOVERY => {
self.discovery.lock().unwrap().as_mut().unwrap().writable(io);
self.discovery.locked().as_mut().unwrap().writable(io);
}
_ => panic!("Received unknown writable token"),
}
@@ -951,11 +951,11 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
warn!("Error initializing public interface: {:?}", e)),
FIRST_SESSION ... LAST_SESSION => self.connection_timeout(token, io),
DISCOVERY_REFRESH => {
self.discovery.lock().unwrap().as_mut().unwrap().refresh();
self.discovery.locked().as_mut().unwrap().refresh();
io.update_registration(DISCOVERY).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e));
},
DISCOVERY_ROUND => {
let node_changes = { self.discovery.lock().unwrap().as_mut().unwrap().round() };
let node_changes = { self.discovery.locked().as_mut().unwrap().round() };
if let Some(node_changes) = node_changes {
self.update_nodes(io, node_changes);
}
@@ -963,13 +963,13 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
},
NODE_TABLE => {
trace!(target: "network", "Refreshing node table");
self.nodes.write().unwrap().clear_useless();
self.nodes.unwrapped_write().clear_useless();
},
_ => match self.timers.read().unwrap().get(&token).cloned() {
Some(timer) => match self.handlers.read().unwrap().get(timer.protocol).cloned() {
_ => match self.timers.unwrapped_read().get(&token).cloned() {
Some(timer) => match self.handlers.unwrapped_read().get(timer.protocol).cloned() {
None => { warn!(target: "network", "No handler found for protocol: {:?}", timer.protocol) },
Some(h) => {
let reserved = self.reserved_nodes.read().unwrap();
let reserved = self.reserved_nodes.unwrapped_read();
h.timeout(&NetworkContext::new(io, timer.protocol, None, self.sessions.clone(), &reserved), timer.token);
}
},
@@ -989,10 +989,10 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
ref versions
} => {
let h = handler.clone();
let reserved = self.reserved_nodes.read().unwrap();
let reserved = self.reserved_nodes.unwrapped_read();
h.initialize(&NetworkContext::new(io, protocol, None, self.sessions.clone(), &reserved));
self.handlers.write().unwrap().insert(protocol, h);
let mut info = self.info.write().unwrap();
self.handlers.unwrapped_write().insert(protocol, h);
let mut info = self.info.unwrapped_write();
for v in versions {
info.capabilities.push(CapabilityInfo { protocol: protocol, version: *v, packet_count:0 });
}
@@ -1003,37 +1003,37 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
ref token,
} => {
let handler_token = {
let mut timer_counter = self.timer_counter.write().unwrap();
let mut timer_counter = self.timer_counter.unwrapped_write();
let counter = &mut *timer_counter;
let handler_token = *counter;
*counter += 1;
handler_token
};
self.timers.write().unwrap().insert(handler_token, ProtocolTimer { protocol: protocol, token: *token });
self.timers.unwrapped_write().insert(handler_token, ProtocolTimer { protocol: protocol, token: *token });
io.register_timer(handler_token, *delay).unwrap_or_else(|e| debug!("Error registering timer {}: {:?}", token, e));
},
NetworkIoMessage::Disconnect(ref peer) => {
let session = { self.sessions.read().unwrap().get(*peer).cloned() };
let session = { self.sessions.unwrapped_read().get(*peer).cloned() };
if let Some(session) = session {
session.lock().unwrap().disconnect(io, DisconnectReason::DisconnectRequested);
session.locked().disconnect(io, DisconnectReason::DisconnectRequested);
}
trace!(target: "network", "Disconnect requested {}", peer);
self.kill_connection(*peer, io, false);
},
NetworkIoMessage::DisablePeer(ref peer) => {
let session = { self.sessions.read().unwrap().get(*peer).cloned() };
let session = { self.sessions.unwrapped_read().get(*peer).cloned() };
if let Some(session) = session {
session.lock().unwrap().disconnect(io, DisconnectReason::DisconnectRequested);
if let Some(id) = session.lock().unwrap().id() {
self.nodes.write().unwrap().mark_as_useless(id)
session.locked().disconnect(io, DisconnectReason::DisconnectRequested);
if let Some(id) = session.locked().id() {
self.nodes.unwrapped_write().mark_as_useless(id)
}
}
trace!(target: "network", "Disabling peer {}", peer);
self.kill_connection(*peer, io, false);
},
NetworkIoMessage::User(ref message) => {
let reserved = self.reserved_nodes.read().unwrap();
for (p, h) in self.handlers.read().unwrap().iter() {
let reserved = self.reserved_nodes.unwrapped_read();
for (p, h) in self.handlers.unwrapped_read().iter() {
h.message(&NetworkContext::new(io, p, None, self.sessions.clone(), &reserved), &message);
}
}
@@ -1044,13 +1044,13 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
fn register_stream(&self, stream: StreamToken, reg: Token, event_loop: &mut EventLoop<IoManager<NetworkIoMessage<Message>>>) {
match stream {
FIRST_SESSION ... LAST_SESSION => {
let session = { self.sessions.read().unwrap().get(stream).cloned() };
let session = { self.sessions.unwrapped_read().get(stream).cloned() };
if let Some(session) = session {
session.lock().unwrap().register_socket(reg, event_loop).expect("Error registering socket");
session.locked().register_socket(reg, event_loop).expect("Error registering socket");
}
}
DISCOVERY => self.discovery.lock().unwrap().as_ref().unwrap().register_socket(event_loop).expect("Error registering discovery socket"),
TCP_ACCEPT => event_loop.register(&*self.tcp_listener.lock().unwrap(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"),
DISCOVERY => self.discovery.locked().as_ref().unwrap().register_socket(event_loop).expect("Error registering discovery socket"),
TCP_ACCEPT => event_loop.register(&*self.tcp_listener.locked(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"),
_ => warn!("Unexpected stream registration")
}
}
@@ -1058,9 +1058,9 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
fn deregister_stream(&self, stream: StreamToken, event_loop: &mut EventLoop<IoManager<NetworkIoMessage<Message>>>) {
match stream {
FIRST_SESSION ... LAST_SESSION => {
let mut connections = self.sessions.write().unwrap();
let mut connections = self.sessions.unwrapped_write();
if let Some(connection) = connections.get(stream).cloned() {
connection.lock().unwrap().deregister_socket(event_loop).expect("Error deregistering socket");
connection.locked().deregister_socket(event_loop).expect("Error deregistering socket");
connections.remove(stream);
}
}
@@ -1072,13 +1072,13 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
fn update_stream(&self, stream: StreamToken, reg: Token, event_loop: &mut EventLoop<IoManager<NetworkIoMessage<Message>>>) {
match stream {
FIRST_SESSION ... LAST_SESSION => {
let connection = { self.sessions.read().unwrap().get(stream).cloned() };
let connection = { self.sessions.unwrapped_read().get(stream).cloned() };
if let Some(connection) = connection {
connection.lock().unwrap().update_socket(reg, event_loop).expect("Error updating socket");
connection.locked().update_socket(reg, event_loop).expect("Error updating socket");
}
}
DISCOVERY => self.discovery.lock().unwrap().as_ref().unwrap().update_registration(event_loop).expect("Error reregistering discovery socket"),
TCP_ACCEPT => event_loop.reregister(&*self.tcp_listener.lock().unwrap(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"),
DISCOVERY => self.discovery.locked().as_ref().unwrap().update_registration(event_loop).expect("Error reregistering discovery socket"),
TCP_ACCEPT => event_loop.reregister(&*self.tcp_listener.locked(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"),
_ => warn!("Unexpected stream update")
}
}

View File

@@ -17,6 +17,7 @@
use std::sync::*;
use error::*;
use panics::*;
use misc::RwLockable;
use network::{NetworkProtocolHandler, NetworkConfiguration};
use network::error::NetworkError;
use network::host::{Host, NetworkIoMessage, ProtocolId};
@@ -85,19 +86,19 @@ impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'stat
/// Returns external url if available.
pub fn external_url(&self) -> Option<String> {
let host = self.host.read().unwrap();
let host = self.host.unwrapped_read();
host.as_ref().and_then(|h| h.external_url())
}
/// Returns external url if available.
pub fn local_url(&self) -> Option<String> {
let host = self.host.read().unwrap();
let host = self.host.unwrapped_read();
host.as_ref().map(|h| h.local_url())
}
/// Start network IO
pub fn start(&self) -> Result<(), UtilError> {
let mut host = self.host.write().unwrap();
let mut host = self.host.unwrapped_write();
if host.is_none() {
let h = Arc::new(try!(Host::new(self.config.clone(), self.stats.clone())));
try!(self.io_service.register_handler(h.clone()));
@@ -108,7 +109,7 @@ impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'stat
/// Stop network IO
pub fn stop(&self) -> Result<(), UtilError> {
let mut host = self.host.write().unwrap();
let mut host = self.host.unwrapped_write();
if let Some(ref host) = *host {
let io = IoContext::new(self.io_service.channel(), 0); //TODO: take token id from host
try!(host.stop(&io));
@@ -119,7 +120,7 @@ impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'stat
/// Try to add a reserved peer.
pub fn add_reserved_peer(&self, peer: &str) -> Result<(), UtilError> {
let host = self.host.read().unwrap();
let host = self.host.unwrapped_read();
if let Some(ref host) = *host {
host.add_reserved_node(peer)
} else {
@@ -129,7 +130,7 @@ impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'stat
/// Try to remove a reserved peer.
pub fn remove_reserved_peer(&self, peer: &str) -> Result<(), UtilError> {
let host = self.host.read().unwrap();
let host = self.host.unwrapped_read();
if let Some(ref host) = *host {
host.remove_reserved_node(peer)
} else {
@@ -139,7 +140,7 @@ impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'stat
/// Set the non-reserved peer mode.
pub fn set_non_reserved_mode(&self, mode: ::network::NonReservedPeerMode) {
let host = self.host.read().unwrap();
let host = self.host.unwrapped_read();
if let Some(ref host) = *host {
let io_ctxt = IoContext::new(self.io_service.channel(), 0);
host.set_non_reserved_mode(mode, &io_ctxt);

View File

@@ -18,6 +18,7 @@ use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use std::thread;
use std::time::*;
use common::*;
use misc::*;
use network::*;
use io::TimerToken;
use crypto::KeyPair;
@@ -51,7 +52,7 @@ impl TestProtocol {
}
pub fn got_packet(&self) -> bool {
self.packet.lock().unwrap().deref()[..] == b"hello"[..]
self.packet.locked().deref()[..] == b"hello"[..]
}
pub fn got_timeout(&self) -> bool {
@@ -70,7 +71,7 @@ impl NetworkProtocolHandler<TestProtocolMessage> for TestProtocol {
fn read(&self, _io: &NetworkContext<TestProtocolMessage>, _peer: &PeerId, packet_id: u8, data: &[u8]) {
assert_eq!(packet_id, 33);
self.packet.lock().unwrap().extend(data);
self.packet.locked().extend(data);
}
fn connected(&self, io: &NetworkContext<TestProtocolMessage>, peer: &PeerId) {

View File

@@ -20,6 +20,7 @@ use std::thread;
use std::ops::DerefMut;
use std::sync::{Arc, Mutex};
use std::default::Default;
use misc::Lockable;
/// Thread-safe closure for handling possible panics
pub trait OnPanicListener: Send + Sync + 'static {
@@ -88,7 +89,7 @@ impl PanicHandler {
/// Notifies all listeners in case there is a panic.
/// You should use `catch_panic` instead of calling this method explicitly.
pub fn notify_all(&self, r: String) {
let mut listeners = self.listeners.lock().unwrap();
let mut listeners = self.listeners.locked();
for listener in listeners.deref_mut() {
listener.call(&r);
}
@@ -97,7 +98,7 @@ impl PanicHandler {
impl MayPanic for PanicHandler {
fn on_panic<F>(&self, closure: F) where F: OnPanicListener {
self.listeners.lock().unwrap().push(Box::new(closure));
self.listeners.locked().push(Box::new(closure));
}
}
@@ -119,46 +120,49 @@ impl<F> OnPanicListener for F
#[ignore] // panic forwarding doesnt work on the same thread in beta
fn should_notify_listeners_about_panic () {
use std::sync::RwLock;
use misc::RwLockable;
// given
let invocations = Arc::new(RwLock::new(vec![]));
let i = invocations.clone();
let p = PanicHandler::new();
p.on_panic(move |t| i.write().unwrap().push(t));
p.on_panic(move |t| i.unwrapped_write().push(t));
// when
p.catch_panic(|| panic!("Panic!")).unwrap_err();
// then
assert!(invocations.read().unwrap()[0] == "Panic!");
assert!(invocations.unwrapped_read()[0] == "Panic!");
}
#[test]
#[ignore] // panic forwarding doesnt work on the same thread in beta
fn should_notify_listeners_about_panic_when_string_is_dynamic () {
use std::sync::RwLock;
use misc::RwLockable;
// given
let invocations = Arc::new(RwLock::new(vec![]));
let i = invocations.clone();
let p = PanicHandler::new();
p.on_panic(move |t| i.write().unwrap().push(t));
p.on_panic(move |t| i.unwrapped_write().push(t));
// when
p.catch_panic(|| panic!("Panic: {}", 1)).unwrap_err();
// then
assert!(invocations.read().unwrap()[0] == "Panic: 1");
assert!(invocations.unwrapped_read()[0] == "Panic: 1");
}
#[test]
fn should_notify_listeners_about_panic_in_other_thread () {
use std::thread;
use std::sync::RwLock;
use misc::RwLockable;
// given
let invocations = Arc::new(RwLock::new(vec![]));
let i = invocations.clone();
let p = PanicHandler::new();
p.on_panic(move |t| i.write().unwrap().push(t));
p.on_panic(move |t| i.unwrapped_write().push(t));
// when
let t = thread::spawn(move ||
@@ -167,18 +171,20 @@ fn should_notify_listeners_about_panic_in_other_thread () {
t.join().unwrap_err();
// then
assert!(invocations.read().unwrap()[0] == "Panic!");
assert!(invocations.unwrapped_read()[0] == "Panic!");
}
#[test]
#[ignore] // panic forwarding doesnt work on the same thread in beta
fn should_forward_panics () {
use std::sync::RwLock;
use misc::RwLockable;
// given
let invocations = Arc::new(RwLock::new(vec![]));
let i = invocations.clone();
let p = PanicHandler::new_in_arc();
p.on_panic(move |t| i.write().unwrap().push(t));
p.on_panic(move |t| i.unwrapped_write().push(t));
let p2 = PanicHandler::new();
p.forward_from(&p2);
@@ -187,5 +193,5 @@ use std::sync::RwLock;
p2.catch_panic(|| panic!("Panic!")).unwrap_err();
// then
assert!(invocations.read().unwrap()[0] == "Panic!");
assert!(invocations.unwrapped_read()[0] == "Panic!");
}