Merge branch 'master' into txqueue-gc

Conflicts:
	ethcore/src/miner/miner.rs
This commit is contained in:
Tomasz Drwięga
2016-12-10 17:00:29 +01:00
203 changed files with 4496 additions and 1632 deletions

View File

@@ -394,7 +394,7 @@ impl<Message> IoChannel<Message> where Message: Send + Clone + Sync + 'static {
/// 'Message' is a notification message type
pub struct IoService<Message> where Message: Send + Sync + Clone + 'static {
panic_handler: Arc<PanicHandler>,
thread: Option<JoinHandle<()>>,
thread: Mutex<Option<JoinHandle<()>>>,
host_channel: Mutex<Sender<IoMessage<Message>>>,
handlers: Arc<RwLock<Slab<Arc<IoHandler<Message>>, HandlerId>>>,
}
@@ -424,12 +424,26 @@ impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static {
});
Ok(IoService {
panic_handler: panic_handler,
thread: Some(thread),
thread: Mutex::new(Some(thread)),
host_channel: Mutex::new(channel),
handlers: handlers,
})
}
pub fn stop(&self) {
trace!(target: "shutdown", "[IoService] Closing...");
// Clear handlers so that shared pointers are not stuck on stack
// in Channel::send_sync
self.handlers.write().clear();
self.host_channel.lock().send(IoMessage::Shutdown).unwrap_or_else(|e| warn!("Error on IO service shutdown: {:?}", e));
if let Some(thread) = self.thread.lock().take() {
thread.join().unwrap_or_else(|e| {
debug!(target: "shutdown", "Error joining IO service event loop thread: {:?}", e);
});
}
trace!(target: "shutdown", "[IoService] Closed.");
}
/// Regiter an IO handler with the event loop.
pub fn register_handler(&self, handler: Arc<IoHandler<Message>+Send>) -> Result<(), IoError> {
try!(self.host_channel.lock().send(IoMessage::AddHandler {
@@ -452,17 +466,7 @@ impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static {
impl<Message> Drop for IoService<Message> where Message: Send + Sync + Clone {
fn drop(&mut self) {
trace!(target: "shutdown", "[IoService] Closing...");
// Clear handlers so that shared pointers are not stuck on stack
// in Channel::send_sync
self.handlers.write().clear();
self.host_channel.lock().send(IoMessage::Shutdown).unwrap_or_else(|e| warn!("Error on IO service shutdown: {:?}", e));
if let Some(thread) = self.thread.take() {
thread.join().unwrap_or_else(|e| {
debug!(target: "shutdown", "Error joining IO service event loop thread: {:?}", e);
});
}
trace!(target: "shutdown", "[IoService] Closed.");
self.stop()
}
}

View File

@@ -22,7 +22,7 @@ use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use std::ops::*;
use std::cmp::min;
use std::path::{Path, PathBuf};
use std::io::{Read, Write};
use std::io::{Read, Write, ErrorKind};
use std::fs;
use ethkey::{KeyPair, Secret, Random, Generator};
use mio::*;
@@ -381,8 +381,6 @@ pub struct Host {
impl Host {
/// Create a new instance
pub fn new(mut config: NetworkConfiguration, stats: Arc<NetworkStats>) -> Result<Host, NetworkError> {
trace!(target: "host", "Creating new Host object");
let mut listen_address = match config.listen_address {
None => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), DEFAULT_PORT)),
Some(addr) => addr,
@@ -405,6 +403,7 @@ impl Host {
// Setup the server socket
let tcp_listener = try!(TcpListener::bind(&listen_address));
listen_address = SocketAddr::new(listen_address.ip(), try!(tcp_listener.local_addr()).port());
debug!(target: "network", "Listening at {:?}", listen_address);
let udp_port = config.udp_port.unwrap_or(listen_address.port());
let local_endpoint = NodeEndpoint { address: listen_address, udp_port: udp_port };
@@ -707,7 +706,10 @@ impl Host {
}
};
match TcpStream::connect(&address) {
Ok(socket) => socket,
Ok(socket) => {
trace!(target: "network", "Connecting to {:?}", address);
socket
},
Err(e) => {
debug!(target: "network", "Can't connect to address {:?}: {:?}", address, e);
return;
@@ -749,7 +751,9 @@ impl Host {
let socket = match self.tcp_listener.lock().accept() {
Ok((sock, _addr)) => sock,
Err(e) => {
debug!(target: "network", "Error accepting connection: {:?}", e);
if e.kind() != ErrorKind::WouldBlock {
debug!(target: "network", "Error accepting connection: {:?}", e);
}
break
},
};
@@ -868,6 +872,12 @@ impl Host {
let reserved = self.reserved_nodes.read();
if let Some(h) = handlers.get(&p).clone() {
h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token);
// accumulate pending packets.
if let Some(session) = session.as_ref() {
let mut session = session.lock();
packet_data.extend(session.mark_connected(p));
}
}
}
for (p, packet_id, data) in packet_data {

View File

@@ -18,6 +18,8 @@ use std::{str, io};
use std::net::SocketAddr;
use std::cmp::Ordering;
use std::sync::*;
use std::collections::HashMap;
use mio::*;
use mio::deprecated::{Handler, EventLoop};
use mio::tcp::*;
@@ -36,6 +38,14 @@ use time;
const PING_TIMEOUT_SEC: u64 = 15;
const PING_INTERVAL_SEC: u64 = 30;
#[derive(Debug, Clone)]
enum ProtocolState {
// Packets pending protocol on_connect event return.
Pending(Vec<(Vec<u8>, u8)>),
// Protocol connected.
Connected,
}
/// Peer session over encrypted connection.
/// When created waits for Hello packet exchange and signals ready state.
/// Sends and receives protocol packets and handles basic packes such as ping/pong and disconnect.
@@ -49,6 +59,8 @@ pub struct Session {
ping_time_ns: u64,
pong_time_ns: Option<u64>,
state: State,
// Protocol states -- accumulates pending packets until signaled as ready.
protocol_states: HashMap<ProtocolId, ProtocolState>,
}
enum State {
@@ -186,6 +198,7 @@ impl Session {
ping_time_ns: 0,
pong_time_ns: None,
expired: false,
protocol_states: HashMap::new(),
})
}
@@ -361,6 +374,20 @@ impl Session {
self.connection().token()
}
/// Signal that a subprotocol has handled the connection successfully and
/// get all pending packets in order received.
pub fn mark_connected(&mut self, protocol: ProtocolId) -> Vec<(ProtocolId, u8, Vec<u8>)> {
match self.protocol_states.insert(protocol, ProtocolState::Connected) {
None => Vec::new(),
Some(ProtocolState::Connected) => {
debug!(target: "network", "Protocol {:?} marked as connected more than once", protocol);
Vec::new()
}
Some(ProtocolState::Pending(pending)) =>
pending.into_iter().map(|(data, id)| (protocol, id, data)).collect(),
}
}
fn read_packet<Message>(&mut self, io: &IoContext<Message>, packet: Packet, host: &HostInfo) -> Result<SessionData, NetworkError>
where Message: Send + Sync + Clone {
if packet.data.len() < 2 {
@@ -408,9 +435,20 @@ impl Session {
// map to protocol
let protocol = self.info.capabilities[i].protocol;
let pid = packet_id - self.info.capabilities[i].id_offset;
trace!(target: "network", "Packet {} mapped to {:?}:{}, i={}, capabilities={:?}", packet_id, protocol, pid, i, self.info.capabilities);
Ok(SessionData::Packet { data: packet.data, protocol: protocol, packet_id: pid } )
let protocol_packet_id = packet_id - self.info.capabilities[i].id_offset;
match *self.protocol_states.entry(protocol).or_insert_with(|| ProtocolState::Pending(Vec::new())) {
ProtocolState::Connected => {
trace!(target: "network", "Packet {} mapped to {:?}:{}, i={}, capabilities={:?}", packet_id, protocol, protocol_packet_id, i, self.info.capabilities);
Ok(SessionData::Packet { data: packet.data, protocol: protocol, packet_id: protocol_packet_id } )
}
ProtocolState::Pending(ref mut pending) => {
trace!(target: "network", "Packet {} deferred until protocol connection event completion", packet_id);
pending.push((packet.data, protocol_packet_id));
Ok(SessionData::Continue)
}
}
},
_ => {
debug!(target: "network", "Unknown packet: {:?}", packet_id);