Switch out .X().unwrap() for .unwrapped_X

This commit is contained in:
Gav Wood
2016-07-07 09:37:31 +02:00
parent 456ad9e21b
commit 3b662c285f
31 changed files with 335 additions and 311 deletions

View File

@@ -21,6 +21,7 @@ use mio::*;
use crossbeam::sync::chase_lev;
use slab::Slab;
use error::*;
use misc::*;
use io::{IoError, IoHandler};
use io::worker::{Worker, Work, WorkType};
use panics::*;
@@ -227,7 +228,7 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
let handler_index = token.as_usize() / TOKENS_PER_HANDLER;
let token_id = token.as_usize() % TOKENS_PER_HANDLER;
if let Some(handler) = self.handlers.get(handler_index) {
if let Some(timer) = self.timers.read().unwrap().get(&token.as_usize()) {
if let Some(timer) = self.timers.unwrapped_read().get(&token.as_usize()) {
event_loop.timeout_ms(token, timer.delay).expect("Error re-registering user timer");
self.worker_channel.push(Work { work_type: WorkType::Timeout, token: token_id, handler: handler.clone(), handler_id: handler_index });
self.work_ready.notify_all();
@@ -249,7 +250,7 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
// TODO: flush event loop
self.handlers.remove(handler_id);
// unregister timers
let mut timers = self.timers.write().unwrap();
let mut timers = self.timers.unwrapped_write();
let to_remove: Vec<_> = timers.keys().cloned().filter(|timer_id| timer_id / TOKENS_PER_HANDLER == handler_id).collect();
for timer_id in to_remove {
let timer = timers.remove(&timer_id).expect("to_remove only contains keys from timers; qed");
@@ -259,11 +260,11 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
IoMessage::AddTimer { handler_id, token, delay } => {
let timer_id = token + handler_id * TOKENS_PER_HANDLER;
let timeout = event_loop.timeout_ms(Token(timer_id), delay).expect("Error registering user timer");
self.timers.write().unwrap().insert(timer_id, UserTimer { delay: delay, timeout: timeout });
self.timers.unwrapped_write().insert(timer_id, UserTimer { delay: delay, timeout: timeout });
},
IoMessage::RemoveTimer { handler_id, token } => {
let timer_id = token + handler_id * TOKENS_PER_HANDLER;
if let Some(timer) = self.timers.write().unwrap().remove(&timer_id) {
if let Some(timer) = self.timers.unwrapped_write().remove(&timer_id) {
event_loop.clear_timeout(timer.timeout);
}
},
@@ -277,7 +278,7 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
handler.deregister_stream(token, event_loop);
// unregister a timer associated with the token (if any)
let timer_id = token + handler_id * TOKENS_PER_HANDLER;
if let Some(timer) = self.timers.write().unwrap().remove(&timer_id) {
if let Some(timer) = self.timers.unwrapped_write().remove(&timer_id) {
event_loop.clear_timeout(timer.timeout);
}
}

View File

@@ -20,6 +20,7 @@ use common::*;
use rlp::*;
use hashdb::*;
use memorydb::*;
use misc::RwLockable;
use super::{DB_PREFIX_LEN, LATEST_ERA_KEY, VERSION_KEY};
use super::traits::JournalDB;
use kvdb::{Database, DBTransaction, DatabaseConfig};
@@ -225,7 +226,7 @@ impl EarlyMergeDB {
#[cfg(test)]
fn can_reconstruct_refs(&self) -> bool {
let (latest_era, reconstructed) = Self::read_refs(&self.backing);
let refs = self.refs.as_ref().unwrap().write().unwrap();
let refs = self.refs.as_ref().unwrap().unwrapped_write();
if *refs != reconstructed || latest_era != self.latest_era {
let clean_refs = refs.iter().filter_map(|(k, v)| if reconstructed.get(k) == Some(v) {None} else {Some((k.clone(), v.clone()))}).collect::<HashMap<_, _>>();
let clean_recon = reconstructed.into_iter().filter_map(|(k, v)| if refs.get(&k) == Some(&v) {None} else {Some((k.clone(), v.clone()))}).collect::<HashMap<_, _>>();
@@ -333,7 +334,7 @@ impl JournalDB for EarlyMergeDB {
fn mem_used(&self) -> usize {
self.overlay.mem_used() + match self.refs {
Some(ref c) => c.read().unwrap().heap_size_of_children(),
Some(ref c) => c.unwrapped_read().heap_size_of_children(),
None => 0
}
}
@@ -385,7 +386,7 @@ impl JournalDB for EarlyMergeDB {
//
// record new commit's details.
let mut refs = self.refs.as_ref().unwrap().write().unwrap();
let mut refs = self.refs.as_ref().unwrap().unwrapped_write();
let batch = DBTransaction::new();
let trace = false;
{

View File

@@ -20,6 +20,7 @@ use common::*;
use rlp::*;
use hashdb::*;
use memorydb::*;
use misc::RwLockable;
use super::{DB_PREFIX_LEN, LATEST_ERA_KEY, VERSION_KEY};
use kvdb::{Database, DBTransaction, DatabaseConfig};
#[cfg(test)]
@@ -136,7 +137,7 @@ impl OverlayRecentDB {
#[cfg(test)]
fn can_reconstruct_refs(&self) -> bool {
let reconstructed = Self::read_overlay(&self.backing);
let journal_overlay = self.journal_overlay.read().unwrap();
let journal_overlay = self.journal_overlay.unwrapped_read();
*journal_overlay == reconstructed
}
@@ -199,7 +200,7 @@ impl JournalDB for OverlayRecentDB {
fn mem_used(&self) -> usize {
let mut mem = self.transaction_overlay.mem_used();
let overlay = self.journal_overlay.read().unwrap();
let overlay = self.journal_overlay.unwrapped_read();
mem += overlay.backing_overlay.mem_used();
mem += overlay.journal.heap_size_of_children();
mem
@@ -209,12 +210,12 @@ impl JournalDB for OverlayRecentDB {
self.backing.get(&LATEST_ERA_KEY).expect("Low level database error").is_none()
}
fn latest_era(&self) -> Option<u64> { self.journal_overlay.read().unwrap().latest_era }
fn latest_era(&self) -> Option<u64> { self.journal_overlay.unwrapped_read().latest_era }
fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
// record new commit's details.
trace!("commit: #{} ({}), end era: {:?}", now, id, end);
let mut journal_overlay = self.journal_overlay.write().unwrap();
let mut journal_overlay = self.journal_overlay.unwrapped_write();
let batch = DBTransaction::new();
{
let mut r = RlpStream::new_list(3);
@@ -321,7 +322,7 @@ impl HashDB for OverlayRecentDB {
match k {
Some(&(ref d, rc)) if rc > 0 => Some(d),
_ => {
let v = self.journal_overlay.read().unwrap().backing_overlay.get(key).map(|v| v.to_vec());
let v = self.journal_overlay.unwrapped_read().backing_overlay.get(key).map(|v| v.to_vec());
match v {
Some(x) => {
Some(&self.transaction_overlay.denote(key, x).0)

View File

@@ -23,6 +23,7 @@ use env_logger::LogBuilder;
use std::sync::{RwLock, RwLockReadGuard};
use std::sync::atomic::{Ordering, AtomicBool};
use arrayvec::ArrayVec;
use misc::RwLockable;
pub use ansi_term::{Colour, Style};
lazy_static! {
@@ -90,7 +91,7 @@ impl RotatingLogger {
/// Append new log entry
pub fn append(&self, log: String) {
self.logs.write().unwrap().insert(0, log);
self.logs.unwrapped_write().insert(0, log);
}
/// Return levels
@@ -100,7 +101,7 @@ impl RotatingLogger {
/// Return logs
pub fn logs(&self) -> RwLockReadGuard<ArrayVec<[String; LOG_SIZE]>> {
self.logs.read().unwrap()
self.logs.unwrapped_read()
}
}

View File

@@ -75,3 +75,18 @@ pub trait Lockable<T> {
impl<T: Sized> Lockable<T> for Mutex<T> {
fn locked(&self) -> MutexGuard<T> { self.lock().unwrap() }
}
/// Object can be read or write locked directly into a guard.
pub trait RwLockable<T> {
/// Read-lock object directly into a `ReadGuard`.
fn unwrapped_read(&self) -> RwLockReadGuard<T>;
/// Write-lock object directly into a `WriteGuard`.
fn unwrapped_write(&self) -> RwLockWriteGuard<T>;
}
impl<T: Sized> RwLockable<T> for RwLock<T> {
fn unwrapped_read(&self) -> RwLockReadGuard<T> { self.read().unwrap() }
fn unwrapped_write(&self) -> RwLockWriteGuard<T> { self.write().unwrap() }
}

View File

@@ -216,7 +216,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
fn resolve_session(&self, peer: PeerId) -> Option<SharedSession> {
match self.session_id {
Some(id) if id == peer => self.session.clone(),
_ => self.sessions.read().unwrap().get(peer).cloned(),
_ => self.sessions.unwrapped_read().get(peer).cloned(),
}
}
@@ -422,7 +422,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
Ok(n) => {
let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() };
self.nodes.write().unwrap().add_node(n);
self.nodes.unwrapped_write().add_node(n);
if let Some(ref mut discovery) = *self.discovery.locked() {
discovery.add_node(entry);
}
@@ -434,7 +434,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
let n = try!(Node::from_str(id));
let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() };
self.reserved_nodes.write().unwrap().insert(n.id.clone());
self.reserved_nodes.unwrapped_write().insert(n.id.clone());
if let Some(ref mut discovery) = *self.discovery.locked() {
discovery.add_node(entry);
@@ -444,16 +444,16 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}
pub fn set_non_reserved_mode(&self, mode: NonReservedPeerMode, io: &IoContext<NetworkIoMessage<Message>>) {
let mut info = self.info.write().unwrap();
let mut info = self.info.unwrapped_write();
if info.config.non_reserved_mode != mode {
info.config.non_reserved_mode = mode.clone();
drop(info);
if let NonReservedPeerMode::Deny = mode {
// disconnect all non-reserved peers here.
let reserved: HashSet<NodeId> = self.reserved_nodes.read().unwrap().clone();
let reserved: HashSet<NodeId> = self.reserved_nodes.unwrapped_read().clone();
let mut to_kill = Vec::new();
for e in self.sessions.write().unwrap().iter_mut() {
for e in self.sessions.unwrapped_write().iter_mut() {
let mut s = e.locked();
{
let id = s.id();
@@ -475,7 +475,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
pub fn remove_reserved_node(&self, id: &str) -> Result<(), UtilError> {
let n = try!(Node::from_str(id));
self.reserved_nodes.write().unwrap().remove(&n.id);
self.reserved_nodes.unwrapped_write().remove(&n.id);
Ok(())
}
@@ -485,11 +485,11 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}
pub fn external_url(&self) -> Option<String> {
self.info.read().unwrap().public_endpoint.as_ref().map(|e| format!("{}", Node::new(self.info.read().unwrap().id().clone(), e.clone())))
self.info.unwrapped_read().public_endpoint.as_ref().map(|e| format!("{}", Node::new(self.info.unwrapped_read().id().clone(), e.clone())))
}
pub fn local_url(&self) -> String {
let r = format!("{}", Node::new(self.info.read().unwrap().id().clone(), self.info.read().unwrap().local_endpoint.clone()));
let r = format!("{}", Node::new(self.info.unwrapped_read().id().clone(), self.info.unwrapped_read().local_endpoint.clone()));
println!("{}", r);
r
}
@@ -497,7 +497,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
pub fn stop(&self, io: &IoContext<NetworkIoMessage<Message>>) -> Result<(), UtilError> {
self.stopping.store(true, AtomicOrdering::Release);
let mut to_kill = Vec::new();
for e in self.sessions.write().unwrap().iter_mut() {
for e in self.sessions.unwrapped_write().iter_mut() {
let mut s = e.locked();
s.disconnect(io, DisconnectReason::ClientQuit);
to_kill.push(s.token());
@@ -512,16 +512,16 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
fn init_public_interface(&self, io: &IoContext<NetworkIoMessage<Message>>) -> Result<(), UtilError> {
io.clear_timer(INIT_PUBLIC).unwrap();
if self.info.read().unwrap().public_endpoint.is_some() {
if self.info.unwrapped_read().public_endpoint.is_some() {
return Ok(());
}
let local_endpoint = self.info.read().unwrap().local_endpoint.clone();
let public_address = self.info.read().unwrap().config.public_address.clone();
let local_endpoint = self.info.unwrapped_read().local_endpoint.clone();
let public_address = self.info.unwrapped_read().config.public_address.clone();
let public_endpoint = match public_address {
None => {
let public_address = select_public_address(local_endpoint.address.port());
let public_endpoint = NodeEndpoint { address: public_address, udp_port: local_endpoint.udp_port };
if self.info.read().unwrap().config.nat_enabled {
if self.info.unwrapped_read().config.nat_enabled {
match map_external_address(&local_endpoint) {
Some(endpoint) => {
info!("NAT mapped to external address {}", endpoint.address);
@@ -536,7 +536,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
Some(addr) => NodeEndpoint { address: addr, udp_port: local_endpoint.udp_port }
};
self.info.write().unwrap().public_endpoint = Some(public_endpoint.clone());
self.info.unwrapped_write().public_endpoint = Some(public_endpoint.clone());
if let Some(url) = self.external_url() {
io.message(NetworkIoMessage::NetworkStarted(url)).unwrap_or_else(|e| warn!("Error sending IO notification: {:?}", e));
@@ -544,15 +544,15 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
// Initialize discovery.
let discovery = {
let info = self.info.read().unwrap();
let info = self.info.unwrapped_read();
if info.config.discovery_enabled && info.config.non_reserved_mode == NonReservedPeerMode::Accept {
Some(Discovery::new(&info.keys, public_endpoint.address.clone(), public_endpoint, DISCOVERY))
} else { None }
};
if let Some(mut discovery) = discovery {
discovery.init_node_list(self.nodes.read().unwrap().unordered_entries());
for n in self.nodes.read().unwrap().unordered_entries() {
discovery.init_node_list(self.nodes.unwrapped_read().unordered_entries());
for n in self.nodes.unwrapped_read().unordered_entries() {
discovery.add_node(n.clone());
}
*self.discovery.locked() = Some(discovery);
@@ -571,7 +571,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}
fn have_session(&self, id: &NodeId) -> bool {
self.sessions.read().unwrap().iter().any(|e| e.locked().info.id == Some(id.clone()))
self.sessions.unwrapped_read().iter().any(|e| e.locked().info.id == Some(id.clone()))
}
fn session_count(&self) -> usize {
@@ -579,16 +579,16 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}
fn connecting_to(&self, id: &NodeId) -> bool {
self.sessions.read().unwrap().iter().any(|e| e.locked().id() == Some(id))
self.sessions.unwrapped_read().iter().any(|e| e.locked().id() == Some(id))
}
fn handshake_count(&self) -> usize {
self.sessions.read().unwrap().count() - self.session_count()
self.sessions.unwrapped_read().count() - self.session_count()
}
fn keep_alive(&self, io: &IoContext<NetworkIoMessage<Message>>) {
let mut to_kill = Vec::new();
for e in self.sessions.write().unwrap().iter_mut() {
for e in self.sessions.unwrapped_write().iter_mut() {
let mut s = e.locked();
if !s.keep_alive(io) {
s.disconnect(io, DisconnectReason::PingTimeout);
@@ -603,7 +603,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
fn connect_peers(&self, io: &IoContext<NetworkIoMessage<Message>>) {
let (ideal_peers, mut pin) = {
let info = self.info.read().unwrap();
let info = self.info.unwrapped_read();
if info.capabilities.is_empty() {
return;
}
@@ -613,7 +613,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
};
let session_count = self.session_count();
let reserved_nodes = self.reserved_nodes.read().unwrap();
let reserved_nodes = self.reserved_nodes.unwrapped_read();
if session_count >= ideal_peers as usize + reserved_nodes.len() {
// check if all pinned nodes are connected.
if reserved_nodes.iter().all(|n| self.have_session(n) && self.connecting_to(n)) {
@@ -634,7 +634,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
// iterate over all nodes, reserved ones coming first.
// if we are pinned to only reserved nodes, ignore all others.
let nodes = reserved_nodes.iter().cloned().chain(if !pin {
self.nodes.read().unwrap().nodes()
self.nodes.unwrapped_read().nodes()
} else {
Vec::new()
});
@@ -662,7 +662,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
let socket = {
let address = {
let mut nodes = self.nodes.write().unwrap();
let mut nodes = self.nodes.unwrapped_write();
if let Some(node) = nodes.get_mut(id) {
node.last_attempted = Some(::time::now());
node.endpoint.address
@@ -687,10 +687,10 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
#[cfg_attr(feature="dev", allow(block_in_if_condition_stmt))]
fn create_connection(&self, socket: TcpStream, id: Option<&NodeId>, io: &IoContext<NetworkIoMessage<Message>>) -> Result<(), UtilError> {
let nonce = self.info.write().unwrap().next_nonce();
let mut sessions = self.sessions.write().unwrap();
let nonce = self.info.unwrapped_write().next_nonce();
let mut sessions = self.sessions.unwrapped_write();
let token = sessions.insert_with_opt(|token| {
match Session::new(io, socket, token, id, &nonce, self.stats.clone(), &self.info.read().unwrap()) {
match Session::new(io, socket, token, id, &nonce, self.stats.clone(), &self.info.unwrapped_read()) {
Ok(s) => Some(Arc::new(Mutex::new(s))),
Err(e) => {
debug!(target: "network", "Session create error: {:?}", e);
@@ -726,10 +726,10 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}
fn session_writable(&self, token: StreamToken, io: &IoContext<NetworkIoMessage<Message>>) {
let session = { self.sessions.read().unwrap().get(token).cloned() };
let session = { self.sessions.unwrapped_read().get(token).cloned() };
if let Some(session) = session {
let mut s = session.locked();
if let Err(e) = s.writable(io, &self.info.read().unwrap()) {
if let Err(e) = s.writable(io, &self.info.unwrapped_read()) {
trace!(target: "network", "Session write error: {}: {:?}", token, e);
}
if s.done() {
@@ -748,16 +748,16 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
let mut ready_data: Vec<ProtocolId> = Vec::new();
let mut packet_data: Vec<(ProtocolId, PacketId, Vec<u8>)> = Vec::new();
let mut kill = false;
let session = { self.sessions.read().unwrap().get(token).cloned() };
let session = { self.sessions.unwrapped_read().get(token).cloned() };
if let Some(session) = session.clone() {
let mut s = session.locked();
loop {
match s.readable(io, &self.info.read().unwrap()) {
match s.readable(io, &self.info.unwrapped_read()) {
Err(e) => {
trace!(target: "network", "Session read error: {}:{:?} ({:?}) {:?}", token, s.id(), s.remote_addr(), e);
if let UtilError::Network(NetworkError::Disconnect(DisconnectReason::IncompatibleProtocol)) = e {
if let Some(id) = s.id() {
self.nodes.write().unwrap().mark_as_useless(id);
self.nodes.unwrapped_write().mark_as_useless(id);
}
}
kill = true;
@@ -767,9 +767,9 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
self.num_sessions.fetch_add(1, AtomicOrdering::SeqCst);
if !s.info.originated {
let session_count = self.session_count();
let reserved_nodes = self.reserved_nodes.read().unwrap();
let reserved_nodes = self.reserved_nodes.unwrapped_read();
let (ideal_peers, reserved_only) = {
let info = self.info.read().unwrap();
let info = self.info.unwrapped_read();
(info.config.ideal_peers, info.config.non_reserved_mode == NonReservedPeerMode::Deny)
};
@@ -784,14 +784,14 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
// Add it no node table
if let Ok(address) = s.remote_addr() {
let entry = NodeEntry { id: s.id().unwrap().clone(), endpoint: NodeEndpoint { address: address, udp_port: address.port() } };
self.nodes.write().unwrap().add_node(Node::new(entry.id.clone(), entry.endpoint.clone()));
self.nodes.unwrapped_write().add_node(Node::new(entry.id.clone(), entry.endpoint.clone()));
let mut discovery = self.discovery.locked();
if let Some(ref mut discovery) = *discovery.deref_mut() {
discovery.add_node(entry);
}
}
}
for (p, _) in self.handlers.read().unwrap().iter() {
for (p, _) in self.handlers.unwrapped_read().iter() {
if s.have_capability(p) {
ready_data.push(p);
}
@@ -802,7 +802,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
protocol,
packet_id,
}) => {
match self.handlers.read().unwrap().get(protocol) {
match self.handlers.unwrapped_read().get(protocol) {
None => { warn!(target: "network", "No handler found for protocol: {:?}", protocol) },
Some(_) => packet_data.push((protocol, packet_id, data)),
}
@@ -815,16 +815,16 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
if kill {
self.kill_connection(token, io, true);
}
let handlers = self.handlers.read().unwrap();
let handlers = self.handlers.unwrapped_read();
for p in ready_data {
let h = handlers.get(p).unwrap().clone();
self.stats.inc_sessions();
let reserved = self.reserved_nodes.read().unwrap();
let reserved = self.reserved_nodes.unwrapped_read();
h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token);
}
for (p, packet_id, data) in packet_data {
let h = handlers.get(p).unwrap().clone();
let reserved = self.reserved_nodes.read().unwrap();
let reserved = self.reserved_nodes.unwrapped_read();
h.read(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token, packet_id, &data[1..]);
}
}
@@ -840,14 +840,14 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
let mut deregister = false;
let mut expired_session = None;
if let FIRST_SESSION ... LAST_SESSION = token {
let sessions = self.sessions.write().unwrap();
let sessions = self.sessions.unwrapped_write();
if let Some(session) = sessions.get(token).cloned() {
expired_session = Some(session.clone());
let mut s = session.locked();
if !s.expired() {
if s.is_ready() {
self.num_sessions.fetch_sub(1, AtomicOrdering::SeqCst);
for (p, _) in self.handlers.read().unwrap().iter() {
for (p, _) in self.handlers.unwrapped_read().iter() {
if s.have_capability(p) {
to_disconnect.push(p);
}
@@ -861,12 +861,12 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}
if let Some(id) = failure_id {
if remote {
self.nodes.write().unwrap().note_failure(&id);
self.nodes.unwrapped_write().note_failure(&id);
}
}
for p in to_disconnect {
let h = self.handlers.read().unwrap().get(p).unwrap().clone();
let reserved = self.reserved_nodes.read().unwrap();
let h = self.handlers.unwrapped_read().get(p).unwrap().clone();
let reserved = self.reserved_nodes.unwrapped_read();
h.disconnected(&NetworkContext::new(io, p, expired_session.clone(), self.sessions.clone(), &reserved), &token);
}
if deregister {
@@ -877,7 +877,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
fn update_nodes(&self, io: &IoContext<NetworkIoMessage<Message>>, node_changes: TableUpdates) {
let mut to_remove: Vec<PeerId> = Vec::new();
{
let sessions = self.sessions.write().unwrap();
let sessions = self.sessions.unwrapped_write();
for c in sessions.iter() {
let s = c.locked();
if let Some(id) = s.id() {
@@ -891,7 +891,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
trace!(target: "network", "Removed from node table: {}", i);
self.kill_connection(i, io, false);
}
self.nodes.write().unwrap().update(node_changes);
self.nodes.unwrapped_write().update(node_changes);
}
}
@@ -963,13 +963,13 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
},
NODE_TABLE => {
trace!(target: "network", "Refreshing node table");
self.nodes.write().unwrap().clear_useless();
self.nodes.unwrapped_write().clear_useless();
},
_ => match self.timers.read().unwrap().get(&token).cloned() {
Some(timer) => match self.handlers.read().unwrap().get(timer.protocol).cloned() {
_ => match self.timers.unwrapped_read().get(&token).cloned() {
Some(timer) => match self.handlers.unwrapped_read().get(timer.protocol).cloned() {
None => { warn!(target: "network", "No handler found for protocol: {:?}", timer.protocol) },
Some(h) => {
let reserved = self.reserved_nodes.read().unwrap();
let reserved = self.reserved_nodes.unwrapped_read();
h.timeout(&NetworkContext::new(io, timer.protocol, None, self.sessions.clone(), &reserved), timer.token);
}
},
@@ -989,10 +989,10 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
ref versions
} => {
let h = handler.clone();
let reserved = self.reserved_nodes.read().unwrap();
let reserved = self.reserved_nodes.unwrapped_read();
h.initialize(&NetworkContext::new(io, protocol, None, self.sessions.clone(), &reserved));
self.handlers.write().unwrap().insert(protocol, h);
let mut info = self.info.write().unwrap();
self.handlers.unwrapped_write().insert(protocol, h);
let mut info = self.info.unwrapped_write();
for v in versions {
info.capabilities.push(CapabilityInfo { protocol: protocol, version: *v, packet_count:0 });
}
@@ -1003,17 +1003,17 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
ref token,
} => {
let handler_token = {
let mut timer_counter = self.timer_counter.write().unwrap();
let mut timer_counter = self.timer_counter.unwrapped_write();
let counter = &mut *timer_counter;
let handler_token = *counter;
*counter += 1;
handler_token
};
self.timers.write().unwrap().insert(handler_token, ProtocolTimer { protocol: protocol, token: *token });
self.timers.unwrapped_write().insert(handler_token, ProtocolTimer { protocol: protocol, token: *token });
io.register_timer(handler_token, *delay).unwrap_or_else(|e| debug!("Error registering timer {}: {:?}", token, e));
},
NetworkIoMessage::Disconnect(ref peer) => {
let session = { self.sessions.read().unwrap().get(*peer).cloned() };
let session = { self.sessions.unwrapped_read().get(*peer).cloned() };
if let Some(session) = session {
session.locked().disconnect(io, DisconnectReason::DisconnectRequested);
}
@@ -1021,19 +1021,19 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
self.kill_connection(*peer, io, false);
},
NetworkIoMessage::DisablePeer(ref peer) => {
let session = { self.sessions.read().unwrap().get(*peer).cloned() };
let session = { self.sessions.unwrapped_read().get(*peer).cloned() };
if let Some(session) = session {
session.locked().disconnect(io, DisconnectReason::DisconnectRequested);
if let Some(id) = session.locked().id() {
self.nodes.write().unwrap().mark_as_useless(id)
self.nodes.unwrapped_write().mark_as_useless(id)
}
}
trace!(target: "network", "Disabling peer {}", peer);
self.kill_connection(*peer, io, false);
},
NetworkIoMessage::User(ref message) => {
let reserved = self.reserved_nodes.read().unwrap();
for (p, h) in self.handlers.read().unwrap().iter() {
let reserved = self.reserved_nodes.unwrapped_read();
for (p, h) in self.handlers.unwrapped_read().iter() {
h.message(&NetworkContext::new(io, p, None, self.sessions.clone(), &reserved), &message);
}
}
@@ -1044,7 +1044,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
fn register_stream(&self, stream: StreamToken, reg: Token, event_loop: &mut EventLoop<IoManager<NetworkIoMessage<Message>>>) {
match stream {
FIRST_SESSION ... LAST_SESSION => {
let session = { self.sessions.read().unwrap().get(stream).cloned() };
let session = { self.sessions.unwrapped_read().get(stream).cloned() };
if let Some(session) = session {
session.locked().register_socket(reg, event_loop).expect("Error registering socket");
}
@@ -1058,7 +1058,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
fn deregister_stream(&self, stream: StreamToken, event_loop: &mut EventLoop<IoManager<NetworkIoMessage<Message>>>) {
match stream {
FIRST_SESSION ... LAST_SESSION => {
let mut connections = self.sessions.write().unwrap();
let mut connections = self.sessions.unwrapped_write();
if let Some(connection) = connections.get(stream).cloned() {
connection.locked().deregister_socket(event_loop).expect("Error deregistering socket");
connections.remove(stream);
@@ -1072,7 +1072,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
fn update_stream(&self, stream: StreamToken, reg: Token, event_loop: &mut EventLoop<IoManager<NetworkIoMessage<Message>>>) {
match stream {
FIRST_SESSION ... LAST_SESSION => {
let connection = { self.sessions.read().unwrap().get(stream).cloned() };
let connection = { self.sessions.unwrapped_read().get(stream).cloned() };
if let Some(connection) = connection {
connection.locked().update_socket(reg, event_loop).expect("Error updating socket");
}

View File

@@ -17,6 +17,7 @@
use std::sync::*;
use error::*;
use panics::*;
use misc::RwLockable;
use network::{NetworkProtocolHandler, NetworkConfiguration};
use network::error::NetworkError;
use network::host::{Host, NetworkIoMessage, ProtocolId};
@@ -80,19 +81,19 @@ impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'stat
/// Returns external url if available.
pub fn external_url(&self) -> Option<String> {
let host = self.host.read().unwrap();
let host = self.host.unwrapped_read();
host.as_ref().and_then(|h| h.external_url())
}
/// Returns external url if available.
pub fn local_url(&self) -> Option<String> {
let host = self.host.read().unwrap();
let host = self.host.unwrapped_read();
host.as_ref().map(|h| h.local_url())
}
/// Start network IO
pub fn start(&self) -> Result<(), UtilError> {
let mut host = self.host.write().unwrap();
let mut host = self.host.unwrapped_write();
if host.is_none() {
let h = Arc::new(try!(Host::new(self.config.clone(), self.stats.clone())));
try!(self.io_service.register_handler(h.clone()));
@@ -103,7 +104,7 @@ impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'stat
/// Stop network IO
pub fn stop(&self) -> Result<(), UtilError> {
let mut host = self.host.write().unwrap();
let mut host = self.host.unwrapped_write();
if let Some(ref host) = *host {
let io = IoContext::new(self.io_service.channel(), 0); //TODO: take token id from host
try!(host.stop(&io));
@@ -114,7 +115,7 @@ impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'stat
/// Try to add a reserved peer.
pub fn add_reserved_peer(&self, peer: &str) -> Result<(), UtilError> {
let host = self.host.read().unwrap();
let host = self.host.unwrapped_read();
if let Some(ref host) = *host {
host.add_reserved_node(peer)
} else {
@@ -124,7 +125,7 @@ impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'stat
/// Try to remove a reserved peer.
pub fn remove_reserved_peer(&self, peer: &str) -> Result<(), UtilError> {
let host = self.host.read().unwrap();
let host = self.host.unwrapped_read();
if let Some(ref host) = *host {
host.remove_reserved_node(peer)
} else {
@@ -134,7 +135,7 @@ impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'stat
/// Set the non-reserved peer mode.
pub fn set_non_reserved_mode(&self, mode: ::network::NonReservedPeerMode) {
let host = self.host.read().unwrap();
let host = self.host.unwrapped_read();
if let Some(ref host) = *host {
let io_ctxt = IoContext::new(self.io_service.channel(), 0);
host.set_non_reserved_mode(mode, &io_ctxt);

View File

@@ -18,6 +18,7 @@ use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use std::thread;
use std::time::*;
use common::*;
use misc::*;
use network::*;
use io::TimerToken;
use crypto::KeyPair;

View File

@@ -120,46 +120,49 @@ impl<F> OnPanicListener for F
#[ignore] // panic forwarding doesnt work on the same thread in beta
fn should_notify_listeners_about_panic () {
use std::sync::RwLock;
use misc::RwLockable;
// given
let invocations = Arc::new(RwLock::new(vec![]));
let i = invocations.clone();
let p = PanicHandler::new();
p.on_panic(move |t| i.write().unwrap().push(t));
p.on_panic(move |t| i.unwrapped_write().push(t));
// when
p.catch_panic(|| panic!("Panic!")).unwrap_err();
// then
assert!(invocations.read().unwrap()[0] == "Panic!");
assert!(invocations.unwrapped_read()[0] == "Panic!");
}
#[test]
#[ignore] // panic forwarding doesnt work on the same thread in beta
fn should_notify_listeners_about_panic_when_string_is_dynamic () {
use std::sync::RwLock;
use misc::RwLockable;
// given
let invocations = Arc::new(RwLock::new(vec![]));
let i = invocations.clone();
let p = PanicHandler::new();
p.on_panic(move |t| i.write().unwrap().push(t));
p.on_panic(move |t| i.unwrapped_write().push(t));
// when
p.catch_panic(|| panic!("Panic: {}", 1)).unwrap_err();
// then
assert!(invocations.read().unwrap()[0] == "Panic: 1");
assert!(invocations.unwrapped_read()[0] == "Panic: 1");
}
#[test]
fn should_notify_listeners_about_panic_in_other_thread () {
use std::thread;
use std::sync::RwLock;
use misc::RwLockable;
// given
let invocations = Arc::new(RwLock::new(vec![]));
let i = invocations.clone();
let p = PanicHandler::new();
p.on_panic(move |t| i.write().unwrap().push(t));
p.on_panic(move |t| i.unwrapped_write().push(t));
// when
let t = thread::spawn(move ||
@@ -168,18 +171,20 @@ fn should_notify_listeners_about_panic_in_other_thread () {
t.join().unwrap_err();
// then
assert!(invocations.read().unwrap()[0] == "Panic!");
assert!(invocations.unwrapped_read()[0] == "Panic!");
}
#[test]
#[ignore] // panic forwarding doesnt work on the same thread in beta
fn should_forward_panics () {
use std::sync::RwLock;
use misc::RwLockable;
// given
let invocations = Arc::new(RwLock::new(vec![]));
let i = invocations.clone();
let p = PanicHandler::new_in_arc();
p.on_panic(move |t| i.write().unwrap().push(t));
p.on_panic(move |t| i.unwrapped_write().push(t));
let p2 = PanicHandler::new();
p.forward_from(&p2);
@@ -188,5 +193,5 @@ use std::sync::RwLock;
p2.catch_panic(|| panic!("Panic!")).unwrap_err();
// then
assert!(invocations.read().unwrap()[0] == "Panic!");
assert!(invocations.unwrapped_read()[0] == "Panic!");
}