mio version bump (#2982)

This commit is contained in:
Arkadiy Paronyan
2016-10-30 09:56:34 +01:00
committed by Gav Wood
parent 70f87ea002
commit bccc56b6b0
14 changed files with 111 additions and 64 deletions

View File

@@ -7,7 +7,7 @@ version = "1.4.0"
authors = ["Ethcore <admin@ethcore.io>"]
[dependencies]
mio = { git = "https://github.com/ethcore/mio", branch = "v0.5.x" }
mio = { git = "https://github.com/carllerche/mio" }
crossbeam = "0.2"
parking_lot = "0.3"
log = "0.3"

View File

@@ -65,7 +65,8 @@ mod service;
mod worker;
mod panics;
use mio::{EventLoop, Token};
use mio::{Token};
use mio::deprecated::{EventLoop, NotifyError};
use std::fmt;
pub use worker::LOCAL_STACK_SIZE;
@@ -96,8 +97,8 @@ impl From<::std::io::Error> for IoError {
}
}
impl<Message> From<::mio::NotifyError<service::IoMessage<Message>>> for IoError where Message: Send + Clone {
fn from(_err: ::mio::NotifyError<service::IoMessage<Message>>) -> IoError {
impl<Message> From<NotifyError<service::IoMessage<Message>>> for IoError where Message: Send + Clone {
fn from(_err: NotifyError<service::IoMessage<Message>>) -> IoError {
IoError::Mio(::std::io::Error::new(::std::io::ErrorKind::ConnectionAborted, "Network IO notification error"))
}
}

View File

@@ -18,13 +18,16 @@ use std::sync::{Arc, Weak};
use std::thread::{self, JoinHandle};
use std::collections::HashMap;
use mio::*;
use mio::timer::{Timeout};
use mio::deprecated::{EventLoop, Handler, Sender, EventLoopBuilder};
use crossbeam::sync::chase_lev;
use slab::Slab;
use {IoError, IoHandler};
use worker::{Worker, Work, WorkType};
use panics::*;
use parking_lot::{RwLock};
use parking_lot::{RwLock, Mutex};
use std::sync::{Condvar as SCondvar, Mutex as SMutex};
use std::time::Duration;
/// Timer ID
pub type TimerToken = usize;
@@ -209,9 +212,9 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
type Timeout = Token;
type Message = IoMessage<Message>;
fn ready(&mut self, _event_loop: &mut EventLoop<Self>, token: Token, events: EventSet) {
let handler_index = token.as_usize() / TOKENS_PER_HANDLER;
let token_id = token.as_usize() % TOKENS_PER_HANDLER;
fn ready(&mut self, _event_loop: &mut EventLoop<Self>, token: Token, events: Ready) {
let handler_index = token.0 / TOKENS_PER_HANDLER;
let token_id = token.0 % TOKENS_PER_HANDLER;
if let Some(handler) = self.handlers.read().get(handler_index) {
if events.is_hup() {
self.worker_channel.push(Work { work_type: WorkType::Hup, token: token_id, handler: handler.clone(), handler_id: handler_index });
@@ -229,11 +232,11 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
}
fn timeout(&mut self, event_loop: &mut EventLoop<Self>, token: Token) {
let handler_index = token.as_usize() / TOKENS_PER_HANDLER;
let token_id = token.as_usize() % TOKENS_PER_HANDLER;
let handler_index = token.0 / TOKENS_PER_HANDLER;
let token_id = token.0 % TOKENS_PER_HANDLER;
if let Some(handler) = self.handlers.read().get(handler_index) {
if let Some(timer) = self.timers.read().get(&token.as_usize()) {
event_loop.timeout_ms(token, timer.delay).expect("Error re-registering user timer");
if let Some(timer) = self.timers.read().get(&token.0) {
event_loop.timeout(token, Duration::from_millis(timer.delay)).expect("Error re-registering user timer");
self.worker_channel.push(Work { work_type: WorkType::Timeout, token: token_id, handler: handler.clone(), handler_id: handler_index });
self.work_ready.notify_all();
}
@@ -258,18 +261,18 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
let to_remove: Vec<_> = timers.keys().cloned().filter(|timer_id| timer_id / TOKENS_PER_HANDLER == handler_id).collect();
for timer_id in to_remove {
let timer = timers.remove(&timer_id).expect("to_remove only contains keys from timers; qed");
event_loop.clear_timeout(timer.timeout);
event_loop.clear_timeout(&timer.timeout);
}
},
IoMessage::AddTimer { handler_id, token, delay } => {
let timer_id = token + handler_id * TOKENS_PER_HANDLER;
let timeout = event_loop.timeout_ms(Token(timer_id), delay).expect("Error registering user timer");
let timeout = event_loop.timeout(Token(timer_id), Duration::from_millis(delay)).expect("Error registering user timer");
self.timers.write().insert(timer_id, UserTimer { delay: delay, timeout: timeout });
},
IoMessage::RemoveTimer { handler_id, token } => {
let timer_id = token + handler_id * TOKENS_PER_HANDLER;
if let Some(timer) = self.timers.write().remove(&timer_id) {
event_loop.clear_timeout(timer.timeout);
event_loop.clear_timeout(&timer.timeout);
}
},
IoMessage::RegisterStream { handler_id, token } => {
@@ -283,7 +286,7 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
// unregister a timer associated with the token (if any)
let timer_id = token + handler_id * TOKENS_PER_HANDLER;
if let Some(timer) = self.timers.write().remove(&timer_id) {
event_loop.clear_timeout(timer.timeout);
event_loop.clear_timeout(&timer.timeout);
}
}
},
@@ -372,7 +375,7 @@ impl<Message> IoChannel<Message> where Message: Send + Clone + Sync + 'static {
pub struct IoService<Message> where Message: Send + Sync + Clone + 'static {
panic_handler: Arc<PanicHandler>,
thread: Option<JoinHandle<()>>,
host_channel: Sender<IoMessage<Message>>,
host_channel: Mutex<Sender<IoMessage<Message>>>,
handlers: Arc<RwLock<Slab<Arc<IoHandler<Message>>, HandlerId>>>,
}
@@ -386,9 +389,9 @@ impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static {
/// Starts IO event loop
pub fn start() -> Result<IoService<Message>, IoError> {
let panic_handler = PanicHandler::new_in_arc();
let mut config = EventLoopConfig::new();
let mut config = EventLoopBuilder::new();
config.messages_per_tick(1024);
let mut event_loop = EventLoop::configured(config).expect("Error creating event loop");
let mut event_loop = config.build().expect("Error creating event loop");
let channel = event_loop.channel();
let panic = panic_handler.clone();
let handlers = Arc::new(RwLock::new(Slab::new(MAX_HANDLERS)));
@@ -402,14 +405,14 @@ impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static {
Ok(IoService {
panic_handler: panic_handler,
thread: Some(thread),
host_channel: channel,
host_channel: Mutex::new(channel),
handlers: handlers,
})
}
/// Regiter an IO handler with the event loop.
pub fn register_handler(&self, handler: Arc<IoHandler<Message>+Send>) -> Result<(), IoError> {
try!(self.host_channel.send(IoMessage::AddHandler {
try!(self.host_channel.lock().send(IoMessage::AddHandler {
handler: handler,
}));
Ok(())
@@ -417,20 +420,20 @@ impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static {
/// Send a message over the network. Normaly `HostIo::send` should be used. This can be used from non-io threads.
pub fn send_message(&self, message: Message) -> Result<(), IoError> {
try!(self.host_channel.send(IoMessage::UserMessage(message)));
try!(self.host_channel.lock().send(IoMessage::UserMessage(message)));
Ok(())
}
/// Create a new message channel
pub fn channel(&self) -> IoChannel<Message> {
IoChannel::new(self.host_channel.clone(), Arc::downgrade(&self.handlers))
IoChannel::new(self.host_channel.lock().clone(), Arc::downgrade(&self.handlers))
}
}
impl<Message> Drop for IoService<Message> where Message: Send + Sync + Clone {
fn drop(&mut self) {
trace!(target: "shutdown", "[IoService] Closing...");
self.host_channel.send(IoMessage::Shutdown).unwrap_or_else(|e| warn!("Error on IO service shutdown: {:?}", e));
self.host_channel.lock().send(IoMessage::Shutdown).unwrap_or_else(|e| warn!("Error on IO service shutdown: {:?}", e));
self.thread.take().unwrap().join().ok();
trace!(target: "shutdown", "[IoService] Closed.");
}

View File

@@ -8,7 +8,8 @@ authors = ["Ethcore <admin@ethcore.io>"]
[dependencies]
log = "0.3"
mio = { git = "https://github.com/ethcore/mio", branch = "v0.5.x" }
mio = { git = "https://github.com/carllerche/mio" }
bytes = "0.3.0"
rand = "0.3.12"
time = "0.1.34"
tiny-keccak = "1.0"

View File

@@ -18,7 +18,8 @@ use std::sync::Arc;
use std::collections::VecDeque;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use mio::{Handler, Token, EventSet, EventLoop, PollOpt, TryRead, TryWrite};
use mio::{Token, Ready, PollOpt};
use mio::deprecated::{Handler, EventLoop, TryRead, TryWrite};
use mio::tcp::*;
use util::hash::*;
use util::sha3::*;
@@ -34,6 +35,7 @@ use rcrypto::aessafe::*;
use rcrypto::symmetriccipher::*;
use rcrypto::buffer::*;
use tiny_keccak::Keccak;
use bytes::{Buf, MutBuf};
use crypto;
const ENCRYPTED_HEADER_LEN: usize = 32;
@@ -57,7 +59,7 @@ pub struct GenericConnection<Socket: GenericSocket> {
/// Send out packets FIFO
send_queue: VecDeque<Cursor<Bytes>>,
/// Event flags this connection expects
interest: EventSet,
interest: Ready,
/// Shared network statistics
stats: Arc<NetworkStats>,
/// Registered flag
@@ -81,8 +83,9 @@ impl<Socket: GenericSocket> GenericConnection<Socket> {
let sock_ref = <Socket as Read>::by_ref(&mut self.socket);
loop {
let max = self.rec_size - self.rec_buf.len();
match sock_ref.take(max as u64).try_read_buf(&mut self.rec_buf) {
match sock_ref.take(max as u64).try_read(unsafe { self.rec_buf.mut_bytes() }) {
Ok(Some(size)) if size != 0 => {
unsafe { self.rec_buf.advance(size); }
self.stats.inc_recv(size);
trace!(target:"network", "{}: Read {} of {} bytes", self.token, self.rec_buf.len(), self.rec_size);
if self.rec_size != 0 && self.rec_buf.len() == self.rec_size {
@@ -109,7 +112,7 @@ impl<Socket: GenericSocket> GenericConnection<Socket> {
trace!(target:"network", "{}: Sending {} bytes", self.token, data.len());
self.send_queue.push_back(Cursor::new(data));
if !self.interest.is_writable() {
self.interest.insert(EventSet::writable());
self.interest.insert(Ready::writable());
}
io.update_registration(self.token).ok();
}
@@ -128,16 +131,19 @@ impl<Socket: GenericSocket> GenericConnection<Socket> {
{
let buf = self.send_queue.front_mut().unwrap();
let send_size = buf.get_ref().len();
if (buf.position() as usize) >= send_size {
let pos = buf.position() as usize;
if (pos as usize) >= send_size {
warn!(target:"net", "Unexpected connection data");
return Ok(WriteStatus::Complete)
}
match self.socket.try_write_buf(buf) {
Ok(Some(size)) if (buf.position() as usize) < send_size => {
let buf = buf as &mut Buf;
match self.socket.try_write(buf.bytes()) {
Ok(Some(size)) if (pos + size) < send_size => {
buf.advance(size);
self.stats.inc_send(size);
Ok(WriteStatus::Ongoing)
},
Ok(Some(size)) if (buf.position() as usize) == send_size => {
Ok(Some(size)) if (pos + size) == send_size => {
self.stats.inc_send(size);
trace!(target:"network", "{}: Wrote {} bytes", self.token, send_size);
Ok(WriteStatus::Complete)
@@ -151,7 +157,7 @@ impl<Socket: GenericSocket> GenericConnection<Socket> {
self.send_queue.pop_front();
}
if self.send_queue.is_empty() {
self.interest.remove(EventSet::writable());
self.interest.remove(Ready::writable());
try!(io.update_registration(self.token));
}
Ok(r)
@@ -171,7 +177,7 @@ impl Connection {
send_queue: VecDeque::new(),
rec_buf: Bytes::new(),
rec_size: 0,
interest: EventSet::hup() | EventSet::readable(),
interest: Ready::hup() | Ready::readable(),
stats: stats,
registered: AtomicBool::new(false),
}
@@ -205,7 +211,7 @@ impl Connection {
rec_buf: Vec::new(),
rec_size: 0,
send_queue: self.send_queue.clone(),
interest: EventSet::hup(),
interest: Ready::hup(),
stats: self.stats.clone(),
registered: AtomicBool::new(false),
})
@@ -499,7 +505,7 @@ mod tests {
use std::sync::atomic::AtomicBool;
use super::super::stats::*;
use std::io::{Read, Write, Error, Cursor, ErrorKind};
use mio::{EventSet};
use mio::{Ready};
use std::collections::VecDeque;
use util::bytes::*;
use devtools::*;
@@ -545,7 +551,7 @@ mod tests {
send_queue: VecDeque::new(),
rec_buf: Bytes::new(),
rec_size: 0,
interest: EventSet::hup() | EventSet::readable(),
interest: Ready::hup() | Ready::readable(),
stats: Arc::<NetworkStats>::new(NetworkStats::new()),
registered: AtomicBool::new(false),
}
@@ -568,7 +574,7 @@ mod tests {
send_queue: VecDeque::new(),
rec_buf: Bytes::new(),
rec_size: 0,
interest: EventSet::hup() | EventSet::readable(),
interest: Ready::hup() | Ready::readable(),
stats: Arc::<NetworkStats>::new(NetworkStats::new()),
registered: AtomicBool::new(false),
}

View File

@@ -20,6 +20,7 @@ use std::collections::{HashSet, HashMap, BTreeMap, VecDeque};
use std::mem;
use std::default::Default;
use mio::*;
use mio::deprecated::{Handler, EventLoop};
use mio::udp::*;
use util::sha3::*;
use time;
@@ -108,7 +109,7 @@ pub struct TableUpdates {
impl Discovery {
pub fn new(key: &KeyPair, listen: SocketAddr, public: NodeEndpoint, token: StreamToken, allow_ips: AllowIP) -> Discovery {
let socket = UdpSocket::bound(&listen).expect("Error binding UDP socket");
let socket = UdpSocket::bind(&listen).expect("Error binding UDP socket");
Discovery {
id: key.public().clone(),
id_hash: key.public().sha3(),
@@ -532,15 +533,15 @@ impl Discovery {
}
pub fn register_socket<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), NetworkError> {
event_loop.register(&self.udp_socket, Token(self.token), EventSet::all(), PollOpt::edge()).expect("Error registering UDP socket");
event_loop.register(&self.udp_socket, Token(self.token), Ready::all(), PollOpt::edge()).expect("Error registering UDP socket");
Ok(())
}
pub fn update_registration<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), NetworkError> {
let registration = if !self.send_queue.is_empty() {
EventSet::readable() | EventSet::writable()
Ready::readable() | Ready::writable()
} else {
EventSet::readable()
Ready::readable()
};
event_loop.reregister(&self.udp_socket, Token(self.token), registration, PollOpt::edge()).expect("Error reregistering UDP socket");
Ok(())

View File

@@ -26,6 +26,7 @@ use std::io::{Read, Write};
use std::fs;
use ethkey::{KeyPair, Secret, Random, Generator};
use mio::*;
use mio::deprecated::{EventLoop};
use mio::tcp::*;
use util::hash::*;
use util::Hashable;
@@ -744,8 +745,7 @@ impl Host {
trace!(target: "network", "Accepting incoming connection");
loop {
let socket = match self.tcp_listener.lock().accept() {
Ok(None) => break,
Ok(Some((sock, _addr))) => sock,
Ok((sock, _addr)) => sock,
Err(e) => {
warn!("Error accepting connection: {:?}", e);
break
@@ -1101,7 +1101,7 @@ impl IoHandler<NetworkIoMessage> for Host {
}
}
DISCOVERY => self.discovery.lock().as_ref().unwrap().register_socket(event_loop).expect("Error registering discovery socket"),
TCP_ACCEPT => event_loop.register(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"),
TCP_ACCEPT => event_loop.register(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), Ready::all(), PollOpt::edge()).expect("Error registering stream"),
_ => warn!("Unexpected stream registration")
}
}
@@ -1129,7 +1129,7 @@ impl IoHandler<NetworkIoMessage> for Host {
}
}
DISCOVERY => self.discovery.lock().as_ref().unwrap().update_registration(event_loop).expect("Error reregistering discovery socket"),
TCP_ACCEPT => event_loop.reregister(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"),
TCP_ACCEPT => event_loop.reregister(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), Ready::all(), PollOpt::edge()).expect("Error reregistering stream"),
_ => warn!("Unexpected stream update")
}
}

View File

@@ -70,6 +70,7 @@ extern crate slab;
extern crate ethkey;
extern crate ethcrypto as crypto;
extern crate rlp;
extern crate bytes;
#[macro_use]
extern crate log;

View File

@@ -19,6 +19,7 @@ use std::net::SocketAddr;
use std::cmp::Ordering;
use std::sync::*;
use mio::*;
use mio::deprecated::{Handler, EventLoop};
use mio::tcp::*;
use util::hash::*;
use rlp::*;