diff --git a/Cargo.lock b/Cargo.lock index a4fc98ded..8435e4f6e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -370,7 +370,7 @@ version = "1.4.0" dependencies = [ "crossbeam 0.2.9 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", - "mio 0.5.1 (git+https://github.com/ethcore/mio?branch=v0.5.x)", + "mio 0.6.0 (git+https://github.com/carllerche/mio)", "parking_lot 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "slab 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -451,6 +451,7 @@ name = "ethcore-network" version = "1.4.0" dependencies = [ "ansi_term 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", + "bytes 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "ethcore-devtools 1.4.0", "ethcore-io 1.4.0", "ethcore-util 1.4.0", @@ -459,7 +460,7 @@ dependencies = [ "igd 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.15 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", - "mio 0.5.1 (git+https://github.com/ethcore/mio?branch=v0.5.x)", + "mio 0.6.0 (git+https://github.com/carllerche/mio)", "parking_lot 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", "rlp 0.1.0", @@ -886,6 +887,11 @@ name = "lazy_static" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "lazycell" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "libc" version = "0.2.15" @@ -1002,6 +1008,22 @@ dependencies = [ "winapi 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "mio" +version = "0.6.0" +source = "git+https://github.com/carllerche/mio#9f17b70d6fecbf912168267ea74cf536f2cba705" +dependencies = [ + "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "lazycell 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.15 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", + "miow 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "net2 0.2.23 (registry+https://github.com/rust-lang/crates.io-index)", + "nix 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", + "slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "miow" version = "0.1.3" @@ -1593,6 +1615,11 @@ name = "slab" version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "slab" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "smallvec" version = "0.1.8" @@ -1984,6 +2011,7 @@ dependencies = [ "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" "checksum language-tags 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a91d884b6667cd606bb5a69aa0c99ba811a115fc68915e7056ec08a46e93199a" "checksum lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "49247ec2a285bb3dcb23cbd9c35193c025e7251bfce77c1d5da97e6362dffe7f" +"checksum lazycell 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ce12306c4739d86ee97c23139f3a34ddf0387bbf181bc7929d287025a8c3ef6b" "checksum libc 0.2.15 (registry+https://github.com/rust-lang/crates.io-index)" = "23e3757828fa702a20072c37ff47938e9dd331b92fac6e223d26d4b7a55f7ee2" "checksum linked-hash-map 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "bda158e0dabeb97ee8a401f4d17e479d6b891a14de0bba79d5cc2d4d325b5e48" "checksum linked-hash-map 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6d262045c5b87c0861b3f004610afd0e2c851e2908d08b6c870cbb9d5f494ecd" @@ -1996,6 +2024,7 @@ dependencies = [ "checksum miniz-sys 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "9d1f4d337a01c32e1f2122510fed46393d53ca35a7f429cb0450abaedfa3ed54" "checksum mio 0.5.1 (git+https://github.com/ethcore/mio?branch=v0.5.x)" = "" "checksum mio 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a637d1ca14eacae06296a008fa7ad955347e34efcb5891cfd8ba05491a37907e" +"checksum mio 0.6.0 (git+https://github.com/carllerche/mio)" = "" "checksum mio 0.6.0-dev (git+https://github.com/ethcore/mio?branch=timer-fix)" = "" "checksum miow 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "d5bfc6782530ac8ace97af10a540054a37126b63b0702ddaaa243b73b5745b9a" "checksum msdos_time 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "c04b68cc63a8480fb2550343695f7be72effdec953a9d4508161c3e69041c7d8" @@ -2062,6 +2091,7 @@ dependencies = [ "checksum slab 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "d807fd58c4181bbabed77cb3b891ba9748241a552bcc5be698faaebefc54f46e" "checksum slab 0.2.0 (git+https://github.com/carllerche/slab?rev=5476efcafb)" = "" "checksum slab 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6dbdd334bd28d328dad1c41b0ea662517883d8880d8533895ef96c8003dec9c4" +"checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23" "checksum smallvec 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "fcc8d19212aacecf95e4a7a2179b26f7aeb9732a915cf01f05b0d3e044865410" "checksum solicit 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "172382bac9424588d7840732b250faeeef88942e37b6e35317dce98cafdd75b2" "checksum spmc 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "93bdab61c1a413e591c4d17388ffa859eaff2df27f1e13a5ec8b716700605adf" diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 4709955d1..0c50d85aa 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -139,7 +139,7 @@ pub struct Client { miner: Arc, sleep_state: Mutex, liveness: AtomicBool, - io_channel: IoChannel, + io_channel: Mutex>, notify: RwLock>>, queue_transactions: AtomicUsize, last_hashes: RwLock>, @@ -235,7 +235,7 @@ impl Client { import_lock: Mutex::new(()), panic_handler: panic_handler, miner: miner, - io_channel: message_channel, + io_channel: Mutex::new(message_channel), notify: RwLock::new(Vec::new()), queue_transactions: AtomicUsize::new(0), last_hashes: RwLock::new(VecDeque::new()), @@ -1139,7 +1139,7 @@ impl BlockChainClient for Client { debug!("Ignoring {} transactions: queue is full", transactions.len()); } else { let len = transactions.len(); - match self.io_channel.send(ClientIoMessage::NewTransactions(transactions)) { + match self.io_channel.lock().send(ClientIoMessage::NewTransactions(transactions)) { Ok(_) => { self.queue_transactions.fetch_add(len, AtomicOrdering::SeqCst); } diff --git a/ethcore/src/snapshot/service.rs b/ethcore/src/snapshot/service.rs index cc30a5c26..16f7c6ec6 100644 --- a/ethcore/src/snapshot/service.rs +++ b/ethcore/src/snapshot/service.rs @@ -213,7 +213,7 @@ pub struct Service { restoration: Mutex>, snapshot_root: PathBuf, db_config: DatabaseConfig, - io_channel: Channel, + io_channel: Mutex, pruning: Algorithm, status: Mutex, reader: RwLock>, @@ -233,7 +233,7 @@ impl Service { restoration: Mutex::new(None), snapshot_root: params.snapshot_root, db_config: params.db_config, - io_channel: params.channel, + io_channel: Mutex::new(params.channel), pruning: params.pruning, status: Mutex::new(RestorationStatus::Inactive), reader: RwLock::new(None), @@ -567,7 +567,7 @@ impl SnapshotService for Service { } fn begin_restore(&self, manifest: ManifestData) { - if let Err(e) = self.io_channel.send(ClientIoMessage::BeginRestoration(manifest)) { + if let Err(e) = self.io_channel.lock().send(ClientIoMessage::BeginRestoration(manifest)) { trace!("Error sending snapshot service message: {:?}", e); } } @@ -578,13 +578,13 @@ impl SnapshotService for Service { } fn restore_state_chunk(&self, hash: H256, chunk: Bytes) { - if let Err(e) = self.io_channel.send(ClientIoMessage::FeedStateChunk(hash, chunk)) { + if let Err(e) = self.io_channel.lock().send(ClientIoMessage::FeedStateChunk(hash, chunk)) { trace!("Error sending snapshot service message: {:?}", e); } } fn restore_block_chunk(&self, hash: H256, chunk: Bytes) { - if let Err(e) = self.io_channel.send(ClientIoMessage::FeedBlockChunk(hash, chunk)) { + if let Err(e) = self.io_channel.lock().send(ClientIoMessage::FeedBlockChunk(hash, chunk)) { trace!("Error sending snapshot service message: {:?}", e); } } diff --git a/ethcore/src/snapshot/watcher.rs b/ethcore/src/snapshot/watcher.rs index 498f00737..43439e437 100644 --- a/ethcore/src/snapshot/watcher.rs +++ b/ethcore/src/snapshot/watcher.rs @@ -16,6 +16,7 @@ //! Watcher for snapshot-related chain events. +use util::Mutex; use client::{BlockChainClient, Client, ChainNotify}; use ids::BlockID; use service::ClientIoMessage; @@ -55,7 +56,7 @@ trait Broadcast: Send + Sync { fn take_at(&self, num: Option); } -impl Broadcast for IoChannel { +impl Broadcast for Mutex> { fn take_at(&self, num: Option) { let num = match num { Some(n) => n, @@ -64,7 +65,7 @@ impl Broadcast for IoChannel { trace!(target: "snapshot_watcher", "broadcast: {}", num); - if let Err(e) = self.send(ClientIoMessage::TakeSnapshot(num)) { + if let Err(e) = self.lock().send(ClientIoMessage::TakeSnapshot(num)) { warn!("Snapshot watcher disconnected from IoService: {}", e); } } @@ -91,7 +92,7 @@ impl Watcher { client: client, sync_status: sync_status, }), - broadcast: Box::new(channel), + broadcast: Box::new(Mutex::new(channel)), period: period, history: history, } diff --git a/ethcore/src/verification/queue/mod.rs b/ethcore/src/verification/queue/mod.rs index f801bbe2e..99e09784d 100644 --- a/ethcore/src/verification/queue/mod.rs +++ b/ethcore/src/verification/queue/mod.rs @@ -109,7 +109,7 @@ pub struct VerificationQueue { struct QueueSignal { deleting: Arc, signalled: AtomicBool, - message_channel: IoChannel, + message_channel: Mutex>, } impl QueueSignal { @@ -121,7 +121,8 @@ impl QueueSignal { } if self.signalled.compare_and_swap(false, true, AtomicOrdering::Relaxed) == false { - if let Err(e) = self.message_channel.send_sync(ClientIoMessage::BlockVerified) { + let channel = self.message_channel.lock().clone(); + if let Err(e) = channel.send_sync(ClientIoMessage::BlockVerified) { debug!("Error sending BlockVerified message: {:?}", e); } } @@ -135,7 +136,8 @@ impl QueueSignal { } if self.signalled.compare_and_swap(false, true, AtomicOrdering::Relaxed) == false { - if let Err(e) = self.message_channel.send(ClientIoMessage::BlockVerified) { + let channel = self.message_channel.lock().clone(); + if let Err(e) = channel.send(ClientIoMessage::BlockVerified) { debug!("Error sending BlockVerified message: {:?}", e); } } @@ -178,7 +180,7 @@ impl VerificationQueue { let ready_signal = Arc::new(QueueSignal { deleting: deleting.clone(), signalled: AtomicBool::new(false), - message_channel: message_channel + message_channel: Mutex::new(message_channel), }); let empty = Arc::new(SCondvar::new()); let panic_handler = PanicHandler::new_in_arc(); diff --git a/util/io/Cargo.toml b/util/io/Cargo.toml index cd3aedd91..3cd51e656 100644 --- a/util/io/Cargo.toml +++ b/util/io/Cargo.toml @@ -7,7 +7,7 @@ version = "1.4.0" authors = ["Ethcore "] [dependencies] -mio = { git = "https://github.com/ethcore/mio", branch = "v0.5.x" } +mio = { git = "https://github.com/carllerche/mio" } crossbeam = "0.2" parking_lot = "0.3" log = "0.3" diff --git a/util/io/src/lib.rs b/util/io/src/lib.rs index 082192dfa..99282d9d8 100644 --- a/util/io/src/lib.rs +++ b/util/io/src/lib.rs @@ -65,7 +65,8 @@ mod service; mod worker; mod panics; -use mio::{EventLoop, Token}; +use mio::{Token}; +use mio::deprecated::{EventLoop, NotifyError}; use std::fmt; pub use worker::LOCAL_STACK_SIZE; @@ -96,8 +97,8 @@ impl From<::std::io::Error> for IoError { } } -impl From<::mio::NotifyError>> for IoError where Message: Send + Clone { - fn from(_err: ::mio::NotifyError>) -> IoError { +impl From>> for IoError where Message: Send + Clone { + fn from(_err: NotifyError>) -> IoError { IoError::Mio(::std::io::Error::new(::std::io::ErrorKind::ConnectionAborted, "Network IO notification error")) } } diff --git a/util/io/src/service.rs b/util/io/src/service.rs index d06d34284..01f390a2c 100644 --- a/util/io/src/service.rs +++ b/util/io/src/service.rs @@ -18,13 +18,16 @@ use std::sync::{Arc, Weak}; use std::thread::{self, JoinHandle}; use std::collections::HashMap; use mio::*; +use mio::timer::{Timeout}; +use mio::deprecated::{EventLoop, Handler, Sender, EventLoopBuilder}; use crossbeam::sync::chase_lev; use slab::Slab; use {IoError, IoHandler}; use worker::{Worker, Work, WorkType}; use panics::*; -use parking_lot::{RwLock}; +use parking_lot::{RwLock, Mutex}; use std::sync::{Condvar as SCondvar, Mutex as SMutex}; +use std::time::Duration; /// Timer ID pub type TimerToken = usize; @@ -209,9 +212,9 @@ impl Handler for IoManager where Message: Send + Clone + Sync type Timeout = Token; type Message = IoMessage; - fn ready(&mut self, _event_loop: &mut EventLoop, token: Token, events: EventSet) { - let handler_index = token.as_usize() / TOKENS_PER_HANDLER; - let token_id = token.as_usize() % TOKENS_PER_HANDLER; + fn ready(&mut self, _event_loop: &mut EventLoop, token: Token, events: Ready) { + let handler_index = token.0 / TOKENS_PER_HANDLER; + let token_id = token.0 % TOKENS_PER_HANDLER; if let Some(handler) = self.handlers.read().get(handler_index) { if events.is_hup() { self.worker_channel.push(Work { work_type: WorkType::Hup, token: token_id, handler: handler.clone(), handler_id: handler_index }); @@ -229,11 +232,11 @@ impl Handler for IoManager where Message: Send + Clone + Sync } fn timeout(&mut self, event_loop: &mut EventLoop, token: Token) { - let handler_index = token.as_usize() / TOKENS_PER_HANDLER; - let token_id = token.as_usize() % TOKENS_PER_HANDLER; + let handler_index = token.0 / TOKENS_PER_HANDLER; + let token_id = token.0 % TOKENS_PER_HANDLER; if let Some(handler) = self.handlers.read().get(handler_index) { - if let Some(timer) = self.timers.read().get(&token.as_usize()) { - event_loop.timeout_ms(token, timer.delay).expect("Error re-registering user timer"); + if let Some(timer) = self.timers.read().get(&token.0) { + event_loop.timeout(token, Duration::from_millis(timer.delay)).expect("Error re-registering user timer"); self.worker_channel.push(Work { work_type: WorkType::Timeout, token: token_id, handler: handler.clone(), handler_id: handler_index }); self.work_ready.notify_all(); } @@ -258,18 +261,18 @@ impl Handler for IoManager where Message: Send + Clone + Sync let to_remove: Vec<_> = timers.keys().cloned().filter(|timer_id| timer_id / TOKENS_PER_HANDLER == handler_id).collect(); for timer_id in to_remove { let timer = timers.remove(&timer_id).expect("to_remove only contains keys from timers; qed"); - event_loop.clear_timeout(timer.timeout); + event_loop.clear_timeout(&timer.timeout); } }, IoMessage::AddTimer { handler_id, token, delay } => { let timer_id = token + handler_id * TOKENS_PER_HANDLER; - let timeout = event_loop.timeout_ms(Token(timer_id), delay).expect("Error registering user timer"); + let timeout = event_loop.timeout(Token(timer_id), Duration::from_millis(delay)).expect("Error registering user timer"); self.timers.write().insert(timer_id, UserTimer { delay: delay, timeout: timeout }); }, IoMessage::RemoveTimer { handler_id, token } => { let timer_id = token + handler_id * TOKENS_PER_HANDLER; if let Some(timer) = self.timers.write().remove(&timer_id) { - event_loop.clear_timeout(timer.timeout); + event_loop.clear_timeout(&timer.timeout); } }, IoMessage::RegisterStream { handler_id, token } => { @@ -283,7 +286,7 @@ impl Handler for IoManager where Message: Send + Clone + Sync // unregister a timer associated with the token (if any) let timer_id = token + handler_id * TOKENS_PER_HANDLER; if let Some(timer) = self.timers.write().remove(&timer_id) { - event_loop.clear_timeout(timer.timeout); + event_loop.clear_timeout(&timer.timeout); } } }, @@ -372,7 +375,7 @@ impl IoChannel where Message: Send + Clone + Sync + 'static { pub struct IoService where Message: Send + Sync + Clone + 'static { panic_handler: Arc, thread: Option>, - host_channel: Sender>, + host_channel: Mutex>>, handlers: Arc>, HandlerId>>>, } @@ -386,9 +389,9 @@ impl IoService where Message: Send + Sync + Clone + 'static { /// Starts IO event loop pub fn start() -> Result, IoError> { let panic_handler = PanicHandler::new_in_arc(); - let mut config = EventLoopConfig::new(); + let mut config = EventLoopBuilder::new(); config.messages_per_tick(1024); - let mut event_loop = EventLoop::configured(config).expect("Error creating event loop"); + let mut event_loop = config.build().expect("Error creating event loop"); let channel = event_loop.channel(); let panic = panic_handler.clone(); let handlers = Arc::new(RwLock::new(Slab::new(MAX_HANDLERS))); @@ -402,14 +405,14 @@ impl IoService where Message: Send + Sync + Clone + 'static { Ok(IoService { panic_handler: panic_handler, thread: Some(thread), - host_channel: channel, + host_channel: Mutex::new(channel), handlers: handlers, }) } /// Regiter an IO handler with the event loop. pub fn register_handler(&self, handler: Arc+Send>) -> Result<(), IoError> { - try!(self.host_channel.send(IoMessage::AddHandler { + try!(self.host_channel.lock().send(IoMessage::AddHandler { handler: handler, })); Ok(()) @@ -417,20 +420,20 @@ impl IoService where Message: Send + Sync + Clone + 'static { /// Send a message over the network. Normaly `HostIo::send` should be used. This can be used from non-io threads. pub fn send_message(&self, message: Message) -> Result<(), IoError> { - try!(self.host_channel.send(IoMessage::UserMessage(message))); + try!(self.host_channel.lock().send(IoMessage::UserMessage(message))); Ok(()) } /// Create a new message channel pub fn channel(&self) -> IoChannel { - IoChannel::new(self.host_channel.clone(), Arc::downgrade(&self.handlers)) + IoChannel::new(self.host_channel.lock().clone(), Arc::downgrade(&self.handlers)) } } impl Drop for IoService where Message: Send + Sync + Clone { fn drop(&mut self) { trace!(target: "shutdown", "[IoService] Closing..."); - self.host_channel.send(IoMessage::Shutdown).unwrap_or_else(|e| warn!("Error on IO service shutdown: {:?}", e)); + self.host_channel.lock().send(IoMessage::Shutdown).unwrap_or_else(|e| warn!("Error on IO service shutdown: {:?}", e)); self.thread.take().unwrap().join().ok(); trace!(target: "shutdown", "[IoService] Closed."); } diff --git a/util/network/Cargo.toml b/util/network/Cargo.toml index 8b566fcf2..1a79df0e0 100644 --- a/util/network/Cargo.toml +++ b/util/network/Cargo.toml @@ -8,7 +8,8 @@ authors = ["Ethcore "] [dependencies] log = "0.3" -mio = { git = "https://github.com/ethcore/mio", branch = "v0.5.x" } +mio = { git = "https://github.com/carllerche/mio" } +bytes = "0.3.0" rand = "0.3.12" time = "0.1.34" tiny-keccak = "1.0" diff --git a/util/network/src/connection.rs b/util/network/src/connection.rs index aadec31b9..0d7c14239 100644 --- a/util/network/src/connection.rs +++ b/util/network/src/connection.rs @@ -18,7 +18,8 @@ use std::sync::Arc; use std::collections::VecDeque; use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; -use mio::{Handler, Token, EventSet, EventLoop, PollOpt, TryRead, TryWrite}; +use mio::{Token, Ready, PollOpt}; +use mio::deprecated::{Handler, EventLoop, TryRead, TryWrite}; use mio::tcp::*; use util::hash::*; use util::sha3::*; @@ -34,6 +35,7 @@ use rcrypto::aessafe::*; use rcrypto::symmetriccipher::*; use rcrypto::buffer::*; use tiny_keccak::Keccak; +use bytes::{Buf, MutBuf}; use crypto; const ENCRYPTED_HEADER_LEN: usize = 32; @@ -57,7 +59,7 @@ pub struct GenericConnection { /// Send out packets FIFO send_queue: VecDeque>, /// Event flags this connection expects - interest: EventSet, + interest: Ready, /// Shared network statistics stats: Arc, /// Registered flag @@ -81,8 +83,9 @@ impl GenericConnection { let sock_ref = ::by_ref(&mut self.socket); loop { let max = self.rec_size - self.rec_buf.len(); - match sock_ref.take(max as u64).try_read_buf(&mut self.rec_buf) { + match sock_ref.take(max as u64).try_read(unsafe { self.rec_buf.mut_bytes() }) { Ok(Some(size)) if size != 0 => { + unsafe { self.rec_buf.advance(size); } self.stats.inc_recv(size); trace!(target:"network", "{}: Read {} of {} bytes", self.token, self.rec_buf.len(), self.rec_size); if self.rec_size != 0 && self.rec_buf.len() == self.rec_size { @@ -109,7 +112,7 @@ impl GenericConnection { trace!(target:"network", "{}: Sending {} bytes", self.token, data.len()); self.send_queue.push_back(Cursor::new(data)); if !self.interest.is_writable() { - self.interest.insert(EventSet::writable()); + self.interest.insert(Ready::writable()); } io.update_registration(self.token).ok(); } @@ -128,16 +131,19 @@ impl GenericConnection { { let buf = self.send_queue.front_mut().unwrap(); let send_size = buf.get_ref().len(); - if (buf.position() as usize) >= send_size { + let pos = buf.position() as usize; + if (pos as usize) >= send_size { warn!(target:"net", "Unexpected connection data"); return Ok(WriteStatus::Complete) } - match self.socket.try_write_buf(buf) { - Ok(Some(size)) if (buf.position() as usize) < send_size => { + let buf = buf as &mut Buf; + match self.socket.try_write(buf.bytes()) { + Ok(Some(size)) if (pos + size) < send_size => { + buf.advance(size); self.stats.inc_send(size); Ok(WriteStatus::Ongoing) }, - Ok(Some(size)) if (buf.position() as usize) == send_size => { + Ok(Some(size)) if (pos + size) == send_size => { self.stats.inc_send(size); trace!(target:"network", "{}: Wrote {} bytes", self.token, send_size); Ok(WriteStatus::Complete) @@ -151,7 +157,7 @@ impl GenericConnection { self.send_queue.pop_front(); } if self.send_queue.is_empty() { - self.interest.remove(EventSet::writable()); + self.interest.remove(Ready::writable()); try!(io.update_registration(self.token)); } Ok(r) @@ -171,7 +177,7 @@ impl Connection { send_queue: VecDeque::new(), rec_buf: Bytes::new(), rec_size: 0, - interest: EventSet::hup() | EventSet::readable(), + interest: Ready::hup() | Ready::readable(), stats: stats, registered: AtomicBool::new(false), } @@ -205,7 +211,7 @@ impl Connection { rec_buf: Vec::new(), rec_size: 0, send_queue: self.send_queue.clone(), - interest: EventSet::hup(), + interest: Ready::hup(), stats: self.stats.clone(), registered: AtomicBool::new(false), }) @@ -499,7 +505,7 @@ mod tests { use std::sync::atomic::AtomicBool; use super::super::stats::*; use std::io::{Read, Write, Error, Cursor, ErrorKind}; - use mio::{EventSet}; + use mio::{Ready}; use std::collections::VecDeque; use util::bytes::*; use devtools::*; @@ -545,7 +551,7 @@ mod tests { send_queue: VecDeque::new(), rec_buf: Bytes::new(), rec_size: 0, - interest: EventSet::hup() | EventSet::readable(), + interest: Ready::hup() | Ready::readable(), stats: Arc::::new(NetworkStats::new()), registered: AtomicBool::new(false), } @@ -568,7 +574,7 @@ mod tests { send_queue: VecDeque::new(), rec_buf: Bytes::new(), rec_size: 0, - interest: EventSet::hup() | EventSet::readable(), + interest: Ready::hup() | Ready::readable(), stats: Arc::::new(NetworkStats::new()), registered: AtomicBool::new(false), } diff --git a/util/network/src/discovery.rs b/util/network/src/discovery.rs index 0aa775031..1f506ee89 100644 --- a/util/network/src/discovery.rs +++ b/util/network/src/discovery.rs @@ -20,6 +20,7 @@ use std::collections::{HashSet, HashMap, BTreeMap, VecDeque}; use std::mem; use std::default::Default; use mio::*; +use mio::deprecated::{Handler, EventLoop}; use mio::udp::*; use util::sha3::*; use time; @@ -108,7 +109,7 @@ pub struct TableUpdates { impl Discovery { pub fn new(key: &KeyPair, listen: SocketAddr, public: NodeEndpoint, token: StreamToken, allow_ips: AllowIP) -> Discovery { - let socket = UdpSocket::bound(&listen).expect("Error binding UDP socket"); + let socket = UdpSocket::bind(&listen).expect("Error binding UDP socket"); Discovery { id: key.public().clone(), id_hash: key.public().sha3(), @@ -532,15 +533,15 @@ impl Discovery { } pub fn register_socket(&self, event_loop: &mut EventLoop) -> Result<(), NetworkError> { - event_loop.register(&self.udp_socket, Token(self.token), EventSet::all(), PollOpt::edge()).expect("Error registering UDP socket"); + event_loop.register(&self.udp_socket, Token(self.token), Ready::all(), PollOpt::edge()).expect("Error registering UDP socket"); Ok(()) } pub fn update_registration(&self, event_loop: &mut EventLoop) -> Result<(), NetworkError> { let registration = if !self.send_queue.is_empty() { - EventSet::readable() | EventSet::writable() + Ready::readable() | Ready::writable() } else { - EventSet::readable() + Ready::readable() }; event_loop.reregister(&self.udp_socket, Token(self.token), registration, PollOpt::edge()).expect("Error reregistering UDP socket"); Ok(()) diff --git a/util/network/src/host.rs b/util/network/src/host.rs index d6e530d6f..9b9906cc3 100644 --- a/util/network/src/host.rs +++ b/util/network/src/host.rs @@ -26,6 +26,7 @@ use std::io::{Read, Write}; use std::fs; use ethkey::{KeyPair, Secret, Random, Generator}; use mio::*; +use mio::deprecated::{EventLoop}; use mio::tcp::*; use util::hash::*; use util::Hashable; @@ -744,8 +745,7 @@ impl Host { trace!(target: "network", "Accepting incoming connection"); loop { let socket = match self.tcp_listener.lock().accept() { - Ok(None) => break, - Ok(Some((sock, _addr))) => sock, + Ok((sock, _addr)) => sock, Err(e) => { warn!("Error accepting connection: {:?}", e); break @@ -1101,7 +1101,7 @@ impl IoHandler for Host { } } DISCOVERY => self.discovery.lock().as_ref().unwrap().register_socket(event_loop).expect("Error registering discovery socket"), - TCP_ACCEPT => event_loop.register(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"), + TCP_ACCEPT => event_loop.register(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), Ready::all(), PollOpt::edge()).expect("Error registering stream"), _ => warn!("Unexpected stream registration") } } @@ -1129,7 +1129,7 @@ impl IoHandler for Host { } } DISCOVERY => self.discovery.lock().as_ref().unwrap().update_registration(event_loop).expect("Error reregistering discovery socket"), - TCP_ACCEPT => event_loop.reregister(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"), + TCP_ACCEPT => event_loop.reregister(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), Ready::all(), PollOpt::edge()).expect("Error reregistering stream"), _ => warn!("Unexpected stream update") } } diff --git a/util/network/src/lib.rs b/util/network/src/lib.rs index fcd36235a..f21cb498d 100644 --- a/util/network/src/lib.rs +++ b/util/network/src/lib.rs @@ -70,6 +70,7 @@ extern crate slab; extern crate ethkey; extern crate ethcrypto as crypto; extern crate rlp; +extern crate bytes; #[macro_use] extern crate log; diff --git a/util/network/src/session.rs b/util/network/src/session.rs index 6d6009535..b1224f56d 100644 --- a/util/network/src/session.rs +++ b/util/network/src/session.rs @@ -19,6 +19,7 @@ use std::net::SocketAddr; use std::cmp::Ordering; use std::sync::*; use mio::*; +use mio::deprecated::{Handler, EventLoop}; use mio::tcp::*; use util::hash::*; use rlp::*;