Reduced thread contention

This commit is contained in:
arkpar 2016-02-14 02:11:55 +01:00
parent 9768fddb19
commit 2d89708ea8

View File

@ -215,7 +215,8 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
/// Send a packet over the network to another peer.
pub fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError> {
if let Some(connection) = self.connections.read().unwrap().get(peer).cloned() {
let connection = { self.connections.read().unwrap().get(peer).cloned() };
if let Some(connection) = connection {
match *connection.lock().unwrap().deref_mut() {
ConnectionEntry::Session(ref mut s) => {
s.send_packet(self.protocol, packet_id as u8, &data).unwrap_or_else(|e| {
@ -264,7 +265,8 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
/// Returns peer identification string
pub fn peer_info(&self, peer: PeerId) -> String {
if let Some(connection) = self.connections.read().unwrap().get(peer).cloned() {
let connection = { self.connections.read().unwrap().get(peer).cloned() };
if let Some(connection) = connection {
if let ConnectionEntry::Session(ref s) = *connection.lock().unwrap().deref() {
return s.info.client_version.clone()
}
@ -525,7 +527,8 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
fn connection_writable(&self, token: StreamToken, io: &IoContext<NetworkIoMessage<Message>>) {
let mut create_session = false;
let mut kill = false;
if let Some(connection) = self.connections.read().unwrap().get(token).cloned() {
let connection = { self.connections.read().unwrap().get(token).cloned() };
if let Some(connection) = connection {
match *connection.lock().unwrap().deref_mut() {
ConnectionEntry::Handshake(ref mut h) => {
match h.writable(io, &self.info.read().unwrap()) {
@ -569,7 +572,8 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
let mut packet_data: Option<(ProtocolId, PacketId, Vec<u8>)> = None;
let mut create_session = false;
let mut kill = false;
if let Some(connection) = self.connections.read().unwrap().get(token).cloned() {
let connection = { self.connections.read().unwrap().get(token).cloned() };
if let Some(connection) = connection {
match *connection.lock().unwrap().deref_mut() {
ConnectionEntry::Handshake(ref mut h) => {
if let Err(e) = h.readable(io, &self.info.read().unwrap()) {
@ -628,20 +632,28 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
fn start_session(&self, token: StreamToken, io: &IoContext<NetworkIoMessage<Message>>) {
let mut connections = self.connections.write().unwrap();
if connections.get(token).is_none() {
return; // handshake expired
let replace = {
let connection = { connections.get(token).cloned() };
if let Some(connection) = connection {
match *connection.lock().unwrap().deref_mut() {
ConnectionEntry::Handshake(_) => true,
_ => false,
}
} else { false }
};
if replace {
connections.replace_with(token, |c| {
match Arc::try_unwrap(c).ok().unwrap().into_inner().unwrap() {
ConnectionEntry::Handshake(h) => {
let session = Session::new(h, io, &self.info.read().unwrap()).expect("Session creation error");
io.update_registration(token).expect("Error updating session registration");
self.stats.inc_sessions();
Some(Arc::new(Mutex::new(ConnectionEntry::Session(session))))
},
_ => { None } // handshake expired
}
}).ok();
}
connections.replace_with(token, |c| {
match Arc::try_unwrap(c).ok().unwrap().into_inner().unwrap() {
ConnectionEntry::Handshake(h) => {
let session = Session::new(h, io, &self.info.read().unwrap()).expect("Session creation error");
io.update_registration(token).expect("Error updating session registration");
self.stats.inc_sessions();
Some(Arc::new(Mutex::new(ConnectionEntry::Session(session))))
},
_ => { None } // handshake expired
}
}).ok();
}
fn connection_timeout(&self, token: StreamToken, io: &IoContext<NetworkIoMessage<Message>>) {
@ -650,15 +662,14 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
fn kill_connection(&self, token: StreamToken, io: &IoContext<NetworkIoMessage<Message>>, remote: bool) {
let mut to_disconnect: Vec<ProtocolId> = Vec::new();
let mut failure_id = None;
{
let mut connections = self.connections.write().unwrap();
if let Some(connection) = connections.get(token).cloned() {
match *connection.lock().unwrap().deref_mut() {
ConnectionEntry::Handshake(ref h) => {
connections.remove(token);
if remote {
self.nodes.write().unwrap().note_failure(h.id());
}
failure_id = Some(h.id().clone());
},
ConnectionEntry::Session(ref mut s) if s.is_ready() => {
for (p, _) in self.handlers.read().unwrap().iter() {
@ -667,15 +678,18 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}
}
connections.remove(token);
if remote {
self.nodes.write().unwrap().note_failure(s.id());
}
failure_id = Some(s.id().clone());
},
_ => {},
}
}
io.deregister_stream(token).expect("Error deregistering stream");
}
if let Some(id) = failure_id {
if remote {
self.nodes.write().unwrap().note_failure(&id);
}
}
for p in to_disconnect {
let h = self.handlers.read().unwrap().get(p).unwrap().clone();
h.disconnected(&NetworkContext::new(io, p, Some(token), self.connections.clone()), &token);
@ -683,18 +697,20 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}
fn update_nodes(&self, io: &IoContext<NetworkIoMessage<Message>>, node_changes: TableUpdates) {
let connections = self.connections.write().unwrap();
let mut to_remove: Vec<PeerId> = Vec::new();
for c in connections.iter() {
match *c.lock().unwrap().deref_mut() {
ConnectionEntry::Handshake(ref h) => {
if node_changes.removed.contains(&h.id()) {
to_remove.push(h.token());
{
let connections = self.connections.write().unwrap();
for c in connections.iter() {
match *c.lock().unwrap().deref_mut() {
ConnectionEntry::Handshake(ref h) => {
if node_changes.removed.contains(&h.id()) {
to_remove.push(h.token());
}
}
}
ConnectionEntry::Session(ref s) => {
if node_changes.removed.contains(&s.id()) {
to_remove.push(s.token());
ConnectionEntry::Session(ref s) => {
if node_changes.removed.contains(&s.id()) {
to_remove.push(s.token());
}
}
}
}
@ -804,7 +820,8 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
io.register_timer(handler_token, *delay).expect("Error registering timer");
},
NetworkIoMessage::Disconnect(ref peer) => {
if let Some(connection) = self.connections.read().unwrap().get(*peer).cloned() {
let connection = { self.connections.read().unwrap().get(*peer).cloned() };
if let Some(connection) = connection {
match *connection.lock().unwrap().deref_mut() {
ConnectionEntry::Handshake(_) => {},
ConnectionEntry::Session(ref mut s) => { s.disconnect(DisconnectReason::DisconnectRequested); }
@ -823,7 +840,8 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
fn register_stream(&self, stream: StreamToken, reg: Token, event_loop: &mut EventLoop<IoManager<NetworkIoMessage<Message>>>) {
match stream {
FIRST_CONNECTION ... LAST_CONNECTION => {
if let Some(connection) = self.connections.read().unwrap().get(stream).cloned() {
let connection = { self.connections.read().unwrap().get(stream).cloned() };
if let Some(connection) = connection {
match *connection.lock().unwrap().deref() {
ConnectionEntry::Handshake(ref h) => h.register_socket(reg, event_loop).expect("Error registering socket"),
ConnectionEntry::Session(_) => warn!("Unexpected session stream registration")
@ -857,7 +875,8 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
fn update_stream(&self, stream: StreamToken, reg: Token, event_loop: &mut EventLoop<IoManager<NetworkIoMessage<Message>>>) {
match stream {
FIRST_CONNECTION ... LAST_CONNECTION => {
if let Some(connection) = self.connections.read().unwrap().get(stream).cloned() {
let connection = { self.connections.read().unwrap().get(stream).cloned() };
if let Some(connection) = connection {
match *connection.lock().unwrap().deref() {
ConnectionEntry::Handshake(ref h) => h.update_socket(reg, event_loop).expect("Error updating socket"),
ConnectionEntry::Session(ref s) => s.update_socket(reg, event_loop).expect("Error updating socket"),