From 982063e1ac8db74b263960d5cfaba0428064acfe Mon Sep 17 00:00:00 2001 From: arkpar Date: Tue, 12 Jan 2016 17:33:40 +0100 Subject: [PATCH 01/13] Started IO service refactoring --- src/io/mod.rs | 68 +++++++++++++++ src/io/service.rs | 207 ++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + 3 files changed, 276 insertions(+) create mode 100644 src/io/mod.rs create mode 100644 src/io/service.rs diff --git a/src/io/mod.rs b/src/io/mod.rs new file mode 100644 index 000000000..e8562e64a --- /dev/null +++ b/src/io/mod.rs @@ -0,0 +1,68 @@ +/// General IO module. +/// +/// Example usage for craeting a network service and adding an IO handler: +/// +/// ```rust +/// extern crate ethcore_util as util; +/// use util::network::*; +/// +/// struct MyHandler; +/// +/// impl ProtocolHandler for MyHandler { +/// fn initialize(&mut self, io: &mut HandlerIo) { +/// io.register_timer(1000); +/// } +/// +/// fn read(&mut self, io: &mut HandlerIo, peer: &PeerId, packet_id: u8, data: &[u8]) { +/// println!("Received {} ({} bytes) from {}", packet_id, data.len(), peer); +/// } +/// +/// fn connected(&mut self, io: &mut HandlerIo, peer: &PeerId) { +/// println!("Connected {}", peer); +/// } +/// +/// fn disconnected(&mut self, io: &mut HandlerIo, peer: &PeerId) { +/// println!("Disconnected {}", peer); +/// } +/// +/// fn timeout(&mut self, io: &mut HandlerIo, timer: TimerToken) { +/// println!("Timeout {}", timer); +/// } +/// +/// fn message(&mut self, io: &mut HandlerIo, message: &Message) { +/// println!("Message {}:{}", message.protocol, message.id); +/// } +/// } +/// +/// fn main () { +/// let mut service = NetworkService::start().expect("Error creating network service"); +/// service.register_protocol(Box::new(MyHandler), "myproto", &[1u8]); +/// +/// // Wait for quit condition +/// // ... +/// // Drop the service +/// } +/// ``` +extern crate mio; +mod service; + +#[derive(Debug)] +pub enum IoError { + Mio(::std::io::Error), +} + +impl From<::mio::NotifyError>> for IoError { + fn from(_err: ::mio::NotifyError>) -> IoError { + IoError::Mio(::std::io::Error::new(::std::io::ErrorKind::ConnectionAborted, "Network IO notification error")) + } +} + +pub type TimerToken = service::TimerToken; +pub type StreamToken = service::StreamToken; +pub type IoContext<'s, M> = service::IoContext<'s, M>; +pub type Message = service::UserMessage; +pub type IoService = service::IoService; +pub type IoHandler = service::IoHandler; + + + diff --git a/src/io/service.rs b/src/io/service.rs new file mode 100644 index 000000000..4ecc34723 --- /dev/null +++ b/src/io/service.rs @@ -0,0 +1,207 @@ +use std::thread::{self, JoinHandle}; +use mio::*; +use mio::util::{Slab}; +use hash::*; +use rlp::*; +use error::*; +use io::IoError; + +/// Generic IO handler. +/// All the handler function are called from within IO event loop. +pub trait IoHandler: Send where M: Send + 'static { + /// Initialize the hadler + fn initialize(&mut self, _io: &mut IoContext) {} + /// Timer function called after a timeout created with `HandlerIo::timeout`. + fn timeout(&mut self, _io: &mut IoContext, _timer: TimerToken) {} + /// Called when a broadcasted message is received. The message can only be sent from a different IO handler. + fn message(&mut self, _io: &mut IoContext, _message: &M) {} + /// Called when an IO stream gets closed + fn stream_hup(&mut self, _io: &mut IoContext, _stream: StreamToken) {} + /// Called when an IO stream can be read from + fn stream_readable(&mut self, _io: &mut IoContext, _stream: StreamToken) {} + /// Called when an IO stream can be written to + fn stream_writable(&mut self, _io: &mut IoContext, _stream: StreamToken) {} +} + +pub type TimerToken = usize; +pub type StreamToken = usize; + +// Tokens +const MAX_USER_TIMERS: usize = 32; +const USER_TIMER: usize = 0; +const LAST_USER_TIMER: usize = USER_TIMER + MAX_USER_TIMERS - 1; + +/// Messages used to communicate with the event loop from other threads. +pub enum IoMessage { + /// Shutdown the event loop + Shutdown, + /// Register a new protocol handler. + AddHandler { + handler: Box+Send>, + }, + /// Broadcast a message across all protocol handlers. + UserMessage(UserMessage), +} + +/// User +pub struct UserMessage { + pub data: M, +} + +/// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem. +pub struct IoContext<'s, M> where M: Send + 'static { + timers: &'s mut Slab, + event_loop: &'s mut EventLoop>, +} + +impl<'s, M> IoContext<'s, M> where M: Send + 'static { + /// Create a new IO access point. Takes references to all the data that can be updated within the IO handler. + fn new(event_loop: &'s mut EventLoop>, timers: &'s mut Slab) -> IoContext<'s, M> { + IoContext { + event_loop: event_loop, + timers: timers, + } + } + + /// Register a new IO timer. Returns a new timer token. 'IoHandler::timeout' will be called with the token. + pub fn register_timer(&mut self, ms: u64) -> Result{ + match self.timers.insert(UserTimer { + delay: ms, + }) { + Ok(token) => { + self.event_loop.timeout_ms(token, ms).expect("Error registering user timer"); + Ok(token.as_usize()) + }, + _ => { panic!("Max timers reached") } + } + } + + /// Broadcast a message to other IO clients + pub fn message(&mut self, message: M) { + match self.event_loop.channel().send(IoMessage::UserMessage(UserMessage { + data: message + })) { + Ok(_) => {} + Err(e) => { panic!("Error sending io message {:?}", e); } + } + } +} + +struct UserTimer { + delay: u64, +} + +/// Root IO handler. Manages user handlers, messages and IO timers. +pub struct IoManager where M: Send { + timers: Slab, + handlers: Vec>>, +} + +impl IoManager where M: Send + 'static { + /// Creates a new instance and registers it with the event loop. + pub fn start(event_loop: &mut EventLoop>) -> Result<(), UtilError> { + let mut io = IoManager { + timers: Slab::new_starting_at(Token(USER_TIMER), MAX_USER_TIMERS), + handlers: Vec::new(), + }; + try!(event_loop.run(&mut io)); + Ok(()) + } +} + +impl Handler for IoManager where M: Send + 'static { + type Timeout = Token; + type Message = IoMessage; + + fn ready(&mut self, event_loop: &mut EventLoop, token: Token, events: EventSet) { + if events.is_hup() { + for h in self.handlers.iter_mut() { + h.stream_hup(&mut IoContext::new(event_loop, &mut self.timers), token.as_usize()); + } + } + else if events.is_readable() { + for h in self.handlers.iter_mut() { + h.stream_readable(&mut IoContext::new(event_loop, &mut self.timers), token.as_usize()); + } + } + else if events.is_writable() { + for h in self.handlers.iter_mut() { + h.stream_writable(&mut IoContext::new(event_loop, &mut self.timers), token.as_usize()); + } + } + } + + fn timeout(&mut self, event_loop: &mut EventLoop, token: Token) { + match token.as_usize() { + USER_TIMER ... LAST_USER_TIMER => { + let delay = { + let timer = self.timers.get_mut(token).expect("Unknown user timer token"); + timer.delay + }; + for h in self.handlers.iter_mut() { + h.timeout(&mut IoContext::new(event_loop, &mut self.timers), token.as_usize()); + } + event_loop.timeout_ms(token, delay).expect("Error re-registering user timer"); + } + _ => { // Just pass the event down. IoHandler is supposed to re-register it if required. + for h in self.handlers.iter_mut() { + h.timeout(&mut IoContext::new(event_loop, &mut self.timers), token.as_usize()); + } + } + } + } + + fn notify(&mut self, event_loop: &mut EventLoop, msg: Self::Message) { + match msg { + IoMessage::Shutdown => event_loop.shutdown(), + IoMessage::AddHandler { + handler, + } => { + self.handlers.push(handler); + }, + IoMessage::UserMessage(message) => { + for h in self.handlers.iter_mut() { + h.message(&mut IoContext::new(event_loop, &mut self.timers), &message.data); + } + } + } + } +} + +/// General IO Service. Starts an event loop and dispatches IO requests. +/// 'M' is a notification message type +pub struct IoService where M: Send + 'static { + thread: Option>, + host_channel: Sender> +} + +impl IoService where M: Send + 'static { + /// Starts IO event loop + pub fn start() -> Result, UtilError> { + let mut event_loop = EventLoop::new().unwrap(); + let channel = event_loop.channel(); + let thread = thread::spawn(move || { + IoManager::::start(&mut event_loop).unwrap(); //TODO: + }); + Ok(IoService { + thread: Some(thread), + host_channel: channel + }) + } + + /// Regiter a IO hadnler with the event loop. + pub fn register_handler(&mut self, handler: Box+Send>) -> Result<(), IoError> { + try!(self.host_channel.send(IoMessage::AddHandler { + handler: handler, + })); + Ok(()) + } +} + +impl Drop for IoService where M: Send { + fn drop(&mut self) { + self.host_channel.send(IoMessage::Shutdown).unwrap(); + self.thread.take().unwrap().join().unwrap(); + } +} + diff --git a/src/lib.rs b/src/lib.rs index b856431cd..acd96ee5f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -71,6 +71,7 @@ pub mod nibbleslice; pub mod heapsizeof; pub mod squeeze; pub mod semantic_version; +pub mod io; pub mod network; pub use common::*; From c98f73c5c9953bb122322dc5a3b64a7021aa5c41 Mon Sep 17 00:00:00 2001 From: arkpar Date: Wed, 13 Jan 2016 11:31:37 +0100 Subject: [PATCH 02/13] Finished splitting IoService and NetworkService --- src/error.rs | 10 +- src/io/mod.rs | 23 +- src/io/service.rs | 58 ++--- src/network/connection.rs | 17 +- src/network/discovery.rs | 6 +- src/network/handshake.rs | 11 +- src/network/host.rs | 470 +++++++++++++++----------------------- src/network/mod.rs | 84 ++----- src/network/service.rs | 43 ++-- src/network/session.rs | 11 +- 10 files changed, 296 insertions(+), 437 deletions(-) diff --git a/src/error.rs b/src/error.rs index 17b65ceec..04f7b96ce 100644 --- a/src/error.rs +++ b/src/error.rs @@ -3,6 +3,7 @@ use rustc_serialize::hex::FromHexError; use network::NetworkError; use rlp::DecoderError; +use io; #[derive(Debug)] pub enum BaseDataError { @@ -13,7 +14,8 @@ pub enum BaseDataError { /// General error type which should be capable of representing all errors in ethcore. pub enum UtilError { Crypto(::crypto::CryptoError), - Io(::std::io::Error), + StdIo(::std::io::Error), + Io(io::IoError), AddressParse(::std::net::AddrParseError), AddressResolve(Option<::std::io::Error>), FromHex(FromHexError), @@ -43,6 +45,12 @@ impl From for UtilError { impl From<::std::io::Error> for UtilError { fn from(err: ::std::io::Error) -> UtilError { + UtilError::StdIo(err) + } +} + +impl From for UtilError { + fn from(err: io::IoError) -> UtilError { UtilError::Io(err) } } diff --git a/src/io/mod.rs b/src/io/mod.rs index e8562e64a..72906b6f4 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -51,18 +51,35 @@ pub enum IoError { Mio(::std::io::Error), } -impl From<::mio::NotifyError>> for IoError { +impl From<::mio::NotifyError>> for IoError where M: Send { fn from(_err: ::mio::NotifyError>) -> IoError { IoError::Mio(::std::io::Error::new(::std::io::ErrorKind::ConnectionAborted, "Network IO notification error")) } } +/// Generic IO handler. +/// All the handler function are called from within IO event loop. +/// `Message` type is used as notification data +pub trait IoHandler: Send where Message: Send + 'static { + /// Initialize the handler + fn initialize(&mut self, _io: IoContext) {} + /// Timer function called after a timeout created with `HandlerIo::timeout`. + fn timeout(&mut self, _io: IoContext, _timer: TimerToken) {} + /// Called when a broadcasted message is received. The message can only be sent from a different IO handler. + fn message(&mut self, _io: IoContext, _message: &mut Message) {} // TODO: make message immutable and provide internal channel for adding network handler + /// Called when an IO stream gets closed + fn stream_hup(&mut self, _io: IoContext, _stream: StreamToken) {} + /// Called when an IO stream can be read from + fn stream_readable(&mut self, _io: IoContext, _stream: StreamToken) {} + /// Called when an IO stream can be written to + fn stream_writable(&mut self, _io: IoContext, _stream: StreamToken) {} +} + pub type TimerToken = service::TimerToken; pub type StreamToken = service::StreamToken; pub type IoContext<'s, M> = service::IoContext<'s, M>; -pub type Message = service::UserMessage; pub type IoService = service::IoService; -pub type IoHandler = service::IoHandler; +pub const USER_TOKEN_START: usize = service::USER_TOKEN; diff --git a/src/io/service.rs b/src/io/service.rs index 4ecc34723..66814ba3c 100644 --- a/src/io/service.rs +++ b/src/io/service.rs @@ -4,24 +4,7 @@ use mio::util::{Slab}; use hash::*; use rlp::*; use error::*; -use io::IoError; - -/// Generic IO handler. -/// All the handler function are called from within IO event loop. -pub trait IoHandler: Send where M: Send + 'static { - /// Initialize the hadler - fn initialize(&mut self, _io: &mut IoContext) {} - /// Timer function called after a timeout created with `HandlerIo::timeout`. - fn timeout(&mut self, _io: &mut IoContext, _timer: TimerToken) {} - /// Called when a broadcasted message is received. The message can only be sent from a different IO handler. - fn message(&mut self, _io: &mut IoContext, _message: &M) {} - /// Called when an IO stream gets closed - fn stream_hup(&mut self, _io: &mut IoContext, _stream: StreamToken) {} - /// Called when an IO stream can be read from - fn stream_readable(&mut self, _io: &mut IoContext, _stream: StreamToken) {} - /// Called when an IO stream can be written to - fn stream_writable(&mut self, _io: &mut IoContext, _stream: StreamToken) {} -} +use io::{IoError, IoHandler}; pub type TimerToken = usize; pub type StreamToken = usize; @@ -30,9 +13,10 @@ pub type StreamToken = usize; const MAX_USER_TIMERS: usize = 32; const USER_TIMER: usize = 0; const LAST_USER_TIMER: usize = USER_TIMER + MAX_USER_TIMERS - 1; +pub const USER_TOKEN: usize = LAST_USER_TIMER + 1; /// Messages used to communicate with the event loop from other threads. -pub enum IoMessage { +pub enum IoMessage where M: Send + Sized { /// Shutdown the event loop Shutdown, /// Register a new protocol handler. @@ -40,18 +24,13 @@ pub enum IoMessage { handler: Box+Send>, }, /// Broadcast a message across all protocol handlers. - UserMessage(UserMessage), -} - -/// User -pub struct UserMessage { - pub data: M, + UserMessage(M) } /// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem. pub struct IoContext<'s, M> where M: Send + 'static { timers: &'s mut Slab, - event_loop: &'s mut EventLoop>, + pub event_loop: &'s mut EventLoop>, } impl<'s, M> IoContext<'s, M> where M: Send + 'static { @@ -78,9 +57,7 @@ impl<'s, M> IoContext<'s, M> where M: Send + 'static { /// Broadcast a message to other IO clients pub fn message(&mut self, message: M) { - match self.event_loop.channel().send(IoMessage::UserMessage(UserMessage { - data: message - })) { + match self.event_loop.channel().send(IoMessage::UserMessage(message)) { Ok(_) => {} Err(e) => { panic!("Error sending io message {:?}", e); } } @@ -116,17 +93,17 @@ impl Handler for IoManager where M: Send + 'static { fn ready(&mut self, event_loop: &mut EventLoop, token: Token, events: EventSet) { if events.is_hup() { for h in self.handlers.iter_mut() { - h.stream_hup(&mut IoContext::new(event_loop, &mut self.timers), token.as_usize()); + h.stream_hup(IoContext::new(event_loop, &mut self.timers), token.as_usize()); } } else if events.is_readable() { for h in self.handlers.iter_mut() { - h.stream_readable(&mut IoContext::new(event_loop, &mut self.timers), token.as_usize()); + h.stream_readable(IoContext::new(event_loop, &mut self.timers), token.as_usize()); } } else if events.is_writable() { for h in self.handlers.iter_mut() { - h.stream_writable(&mut IoContext::new(event_loop, &mut self.timers), token.as_usize()); + h.stream_writable(IoContext::new(event_loop, &mut self.timers), token.as_usize()); } } } @@ -139,29 +116,30 @@ impl Handler for IoManager where M: Send + 'static { timer.delay }; for h in self.handlers.iter_mut() { - h.timeout(&mut IoContext::new(event_loop, &mut self.timers), token.as_usize()); + h.timeout(IoContext::new(event_loop, &mut self.timers), token.as_usize()); } event_loop.timeout_ms(token, delay).expect("Error re-registering user timer"); } _ => { // Just pass the event down. IoHandler is supposed to re-register it if required. for h in self.handlers.iter_mut() { - h.timeout(&mut IoContext::new(event_loop, &mut self.timers), token.as_usize()); + h.timeout(IoContext::new(event_loop, &mut self.timers), token.as_usize()); } } } } fn notify(&mut self, event_loop: &mut EventLoop, msg: Self::Message) { - match msg { + let mut m = msg; + match m { IoMessage::Shutdown => event_loop.shutdown(), IoMessage::AddHandler { handler, } => { self.handlers.push(handler); }, - IoMessage::UserMessage(message) => { + IoMessage::UserMessage(ref mut data) => { for h in self.handlers.iter_mut() { - h.message(&mut IoContext::new(event_loop, &mut self.timers), &message.data); + h.message(IoContext::new(event_loop, &mut self.timers), data); } } } @@ -196,6 +174,12 @@ impl IoService where M: Send + 'static { })); Ok(()) } + + /// Send a message over the network. Normaly `HostIo::send` should be used. This can be used from non-io threads. + pub fn send_message(&mut self, message: M) -> Result<(), IoError> { + try!(self.host_channel.send(IoMessage::UserMessage(message))); + Ok(()) + } } impl Drop for IoService where M: Send { diff --git a/src/network/connection.rs b/src/network/connection.rs index 2c671a08a..f11c10384 100644 --- a/src/network/connection.rs +++ b/src/network/connection.rs @@ -1,14 +1,13 @@ use std::collections::VecDeque; -use mio::{Token, EventSet, EventLoop, Timeout, PollOpt, TryRead, TryWrite}; +use mio::{Handler, Token, EventSet, EventLoop, Timeout, PollOpt, TryRead, TryWrite}; use mio::tcp::*; use hash::*; use sha3::*; use bytes::*; use rlp::*; use std::io::{self, Cursor, Read}; -use network::host::{Host}; use error::*; -use network::NetworkError; +use network::error::NetworkError; use network::handshake::Handshake; use crypto; use rcrypto::blockmodes::*; @@ -133,7 +132,7 @@ impl Connection { } /// Register this connection with the IO event loop. - pub fn register(&mut self, event_loop: &mut EventLoop) -> io::Result<()> { + pub fn register(&mut self, event_loop: &mut EventLoop) -> io::Result<()> { trace!(target: "net", "connection register; token={:?}", self.token); self.interest.insert(EventSet::readable()); event_loop.register(&self.socket, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| { @@ -143,7 +142,7 @@ impl Connection { } /// Update connection registration. Should be called at the end of the IO handler. - pub fn reregister(&mut self, event_loop: &mut EventLoop) -> io::Result<()> { + pub fn reregister(&mut self, event_loop: &mut EventLoop) -> io::Result<()> { trace!(target: "net", "connection reregister; token={:?}", self.token); event_loop.reregister( &self.socket, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| { error!("Failed to reregister {:?}, {:?}", self.token, e); @@ -338,7 +337,7 @@ impl EncryptedConnection { } /// Readable IO handler. Tracker receive status and returns decoded packet if avaialable. - pub fn readable(&mut self, event_loop: &mut EventLoop) -> Result, UtilError> { + pub fn readable(&mut self, event_loop: &mut EventLoop) -> Result, UtilError> { self.idle_timeout.map(|t| event_loop.clear_timeout(t)); match self.read_state { EncryptedConnectionState::Header => { @@ -364,14 +363,14 @@ impl EncryptedConnection { } /// Writable IO handler. Processes send queeue. - pub fn writable(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { + pub fn writable(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { self.idle_timeout.map(|t| event_loop.clear_timeout(t)); try!(self.connection.writable()); Ok(()) } /// Register this connection with the event handler. - pub fn register(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { + pub fn register>(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { self.connection.expect(ENCRYPTED_HEADER_LEN); self.idle_timeout.map(|t| event_loop.clear_timeout(t)); self.idle_timeout = event_loop.timeout_ms(self.connection.token, 1800).ok(); @@ -380,7 +379,7 @@ impl EncryptedConnection { } /// Update connection registration. This should be called at the end of the event loop. - pub fn reregister(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { + pub fn reregister(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { try!(self.connection.reregister(event_loop)); Ok(()) } diff --git a/src/network/discovery.rs b/src/network/discovery.rs index f2d139f45..d2a4f21a2 100644 --- a/src/network/discovery.rs +++ b/src/network/discovery.rs @@ -10,7 +10,7 @@ use mio::udp::*; use hash::*; use sha3::Hashable; use crypto::*; -use network::host::*; +use network::node::*; const ADDRESS_BYTES_SIZE: u32 = 32; ///< Size of address type in bytes. const ADDRESS_BITS: u32 = 8 * ADDRESS_BYTES_SIZE; ///< Denoted by n in [Kademlia]. @@ -70,14 +70,14 @@ impl Discovery { self.node_buckets[Discovery::distance(&self.id, &id) as usize].nodes.push(id.clone()); } - fn start_node_discovery(&mut self, event_loop: &mut EventLoop) { + fn start_node_discovery(&mut self, event_loop: &mut EventLoop) { self.discovery_round = 0; self.discovery_id.randomize(); self.discovery_nodes.clear(); self.discover(event_loop); } - fn discover(&mut self, event_loop: &mut EventLoop) { + fn discover(&mut self, event_loop: &mut EventLoop) { if self.discovery_round == DISCOVERY_MAX_STEPS { debug!("Restarting discovery"); diff --git a/src/network/handshake.rs b/src/network/handshake.rs index af67f7ddf..ca95808b4 100644 --- a/src/network/handshake.rs +++ b/src/network/handshake.rs @@ -6,9 +6,10 @@ use bytes::Bytes; use crypto::*; use crypto; use network::connection::{Connection}; -use network::host::{NodeId, Host, HostInfo}; +use network::host::{HostInfo}; +use network::node::NodeId; use error::*; -use network::NetworkError; +use network::error::NetworkError; #[derive(PartialEq, Eq, Debug)] enum HandshakeState { @@ -88,7 +89,7 @@ impl Handshake { } /// Readable IO handler. Drives the state change. - pub fn readable(&mut self, event_loop: &mut EventLoop, host: &HostInfo) -> Result<(), UtilError> { + pub fn readable(&mut self, event_loop: &mut EventLoop, host: &HostInfo) -> Result<(), UtilError> { self.idle_timeout.map(|t| event_loop.clear_timeout(t)); match self.state { HandshakeState::ReadingAuth => { @@ -118,7 +119,7 @@ impl Handshake { } /// Writabe IO handler. - pub fn writable(&mut self, event_loop: &mut EventLoop, _host: &HostInfo) -> Result<(), UtilError> { + pub fn writable(&mut self, event_loop: &mut EventLoop, _host: &HostInfo) -> Result<(), UtilError> { self.idle_timeout.map(|t| event_loop.clear_timeout(t)); try!(self.connection.writable()); if self.state != HandshakeState::StartSession { @@ -128,7 +129,7 @@ impl Handshake { } /// Register the IO handler with the event loop - pub fn register(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { + pub fn register>(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { self.idle_timeout.map(|t| event_loop.clear_timeout(t)); self.idle_timeout = event_loop.timeout_ms(self.connection.token, 1800).ok(); try!(self.connection.register(event_loop)); diff --git a/src/network/host.rs b/src/network/host.rs index 11dc491b2..d61ef614c 100644 --- a/src/network/host.rs +++ b/src/network/host.rs @@ -1,6 +1,7 @@ -use std::net::{SocketAddr, ToSocketAddrs}; +use std::mem; +use std::net::{SocketAddr }; use std::collections::{HashMap}; -use std::hash::{Hash, Hasher}; +use std::hash::{Hasher}; use std::str::{FromStr}; use mio::*; use mio::util::{Slab}; @@ -10,23 +11,18 @@ use hash::*; use crypto::*; use sha3::Hashable; use rlp::*; -use time::Tm; use network::handshake::Handshake; use network::session::{Session, SessionData}; use error::*; -use network::ProtocolHandler; +use io::*; +use network::NetworkProtocolHandler; +use network::node::*; const _DEFAULT_PORT: u16 = 30304; const MAX_CONNECTIONS: usize = 1024; -const MAX_USER_TIMERS: usize = 32; const IDEAL_PEERS: u32 = 10; -/// Node public key -pub type NodeId = H512; -/// IO Timer id -pub type TimerToken = usize; - #[derive(Debug)] struct NetworkConfiguration { listen_address: SocketAddr, @@ -48,88 +44,15 @@ impl NetworkConfiguration { } } -#[derive(Debug)] -/// Noe address info -pub struct NodeEndpoint { - /// IP(V4 or V6) address - address: SocketAddr, - /// Address as string (can be host name). - address_str: String, - /// Conneciton port. - udp_port: u16 -} - -impl NodeEndpoint { - /// Create endpoint from string. Performs name resolution if given a host name. - fn from_str(s: &str) -> Result { - let address = s.to_socket_addrs().map(|mut i| i.next()); - match address { - Ok(Some(a)) => Ok(NodeEndpoint { - address: a, - address_str: s.to_string(), - udp_port: a.port() - }), - Ok(_) => Err(UtilError::AddressResolve(None)), - Err(e) => Err(UtilError::AddressResolve(Some(e))) - } - } -} - -#[derive(PartialEq, Eq, Copy, Clone)] -enum PeerType { - Required, - Optional -} - -struct Node { - id: NodeId, - endpoint: NodeEndpoint, - peer_type: PeerType, - last_attempted: Option, -} - -impl FromStr for Node { - type Err = UtilError; - fn from_str(s: &str) -> Result { - let (id, endpoint) = if &s[0..8] == "enode://" && s.len() > 136 && &s[136..137] == "@" { - (try!(NodeId::from_str(&s[8..136])), try!(NodeEndpoint::from_str(&s[137..]))) - } - else { - (NodeId::new(), try!(NodeEndpoint::from_str(s))) - }; - - Ok(Node { - id: id, - endpoint: endpoint, - peer_type: PeerType::Optional, - last_attempted: None, - }) - } -} - -impl PartialEq for Node { - fn eq(&self, other: &Self) -> bool { - self.id == other.id - } -} -impl Eq for Node { } - -impl Hash for Node { - fn hash(&self, state: &mut H) where H: Hasher { - self.id.hash(state) - } -} - // Tokens -const TCP_ACCEPT: usize = 1; -const IDLE: usize = 3; -const NODETABLE_RECEIVE: usize = 4; -const NODETABLE_MAINTAIN: usize = 5; -const NODETABLE_DISCOVERY: usize = 6; -const FIRST_CONNECTION: usize = 7; +const TOKEN_BEGIN: usize = USER_TOKEN_START; +const TCP_ACCEPT: usize = TOKEN_BEGIN; +const IDLE: usize = TOKEN_BEGIN + 1; +const NODETABLE_RECEIVE: usize = TOKEN_BEGIN + 2; +const NODETABLE_MAINTAIN: usize = TOKEN_BEGIN + 3; +const NODETABLE_DISCOVERY: usize = TOKEN_BEGIN + 4; +const FIRST_CONNECTION: usize = TOKEN_BEGIN + 16; const LAST_CONNECTION: usize = FIRST_CONNECTION + MAX_CONNECTIONS - 1; -const USER_TIMER: usize = LAST_CONNECTION; -const LAST_USER_TIMER: usize = USER_TIMER + MAX_USER_TIMERS - 1; /// Protocol handler level packet id pub type PacketId = u8; @@ -137,12 +60,10 @@ pub type PacketId = u8; pub type ProtocolId = &'static str; /// Messages used to communitate with the event loop from other threads. -pub enum HostMessage { - /// Shutdown the event loop - Shutdown, +pub enum NetworkIoMessage where Message: Send { /// Register a new protocol handler. AddHandler { - handler: Box, + handler: Option+Send>>, protocol: ProtocolId, versions: Vec, }, @@ -153,19 +74,11 @@ pub enum HostMessage { protocol: ProtocolId, data: Vec, }, - /// Broadcast a message across the protocol handlers. - UserMessage(UserMessage), -} - -/// Id for broadcast message -pub type UserMessageId = u32; - -/// User -pub struct UserMessage { - /// ID of a protocol - pub protocol: ProtocolId, - pub id: UserMessageId, - pub data: Option>, + /// User message + User { + protocol: ProtocolId, + message: Message, + }, } /// Local (temporary) peer session ID. @@ -190,31 +103,38 @@ impl Encodable for CapabilityInfo { } /// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem. -pub struct HostIo<'s> { - protocol: ProtocolId, +pub struct NetworkContext<'s, Message> where Message: Send + 'static { + io: IoContext<'s, NetworkIoMessage>, + protocol: Option, connections: &'s mut Slab, - timers: &'s mut Slab, - session: Option, - event_loop: &'s mut EventLoop, + timers: &'s mut HashMap, + session: Option, } -impl<'s> HostIo<'s> { - /// Create a new IO access point. Takes references to all the data that can be updated within the IO handler. - fn new(protocol: ProtocolId, session: Option, event_loop: &'s mut EventLoop, connections: &'s mut Slab, timers: &'s mut Slab) -> HostIo<'s> { - HostIo { +impl<'s, Message> NetworkContext<'s, Message> where Message: Send + 'static, { + /// Create a new network IO access point. Takes references to all the data that can be updated within the IO handler. + fn new(io: IoContext<'s, NetworkIoMessage>, + protocol: Option, + session: Option, connections: &'s mut Slab, + timers: &'s mut HashMap) -> NetworkContext<'s, Message> { + NetworkContext { + io: io, protocol: protocol, session: session, - event_loop: event_loop, connections: connections, timers: timers, } } + fn set_protocol(&mut self, protocol: ProtocolId) { + self.protocol = Some(protocol); + } + /// Send a packet over the network to another peer. pub fn send(&mut self, peer: PeerId, packet_id: PacketId, data: Vec) -> Result<(), UtilError> { match self.connections.get_mut(Token(peer)) { Some(&mut ConnectionEntry::Session(ref mut s)) => { - s.send_packet(self.protocol, packet_id as u8, &data).unwrap_or_else(|e| { + s.send_packet(self.protocol.unwrap(), packet_id as u8, &data).unwrap_or_else(|e| { warn!(target: "net", "Send error: {:?}", e); }); //TODO: don't copy vector data }, @@ -228,49 +148,28 @@ impl<'s> HostIo<'s> { /// Respond to a current network message. Panics if no there is no packet in the context. pub fn respond(&mut self, packet_id: PacketId, data: Vec) -> Result<(), UtilError> { match self.session { - Some(session) => self.send(session.as_usize(), packet_id, data), + Some(session) => self.send(session, packet_id, data), None => { panic!("Respond: Session does not exist") } } } - /// Register a new IO timer. Returns a new timer toke. 'ProtocolHandler::timeout' will be called with the token. - pub fn register_timer(&mut self, ms: u64) -> Result{ - match self.timers.insert(UserTimer { - delay: ms, - protocol: self.protocol, - }) { - Ok(token) => { - self.event_loop.timeout_ms(token, ms).expect("Error registering user timer"); - Ok(token.as_usize()) - }, - _ => { panic!("Max timers reached") } - } - } - - /// Broadcast a message to other IO clients - pub fn message(&mut self, id: UserMessageId, data: Option>) { - match self.event_loop.channel().send(HostMessage::UserMessage(UserMessage { - protocol: self.protocol, - id: id, - data: data - })) { - Ok(_) => {} - Err(e) => { panic!("Error sending io message {:?}", e); } - } - } - /// Disable current protocol capability for given peer. If no capabilities left peer gets disconnected. pub fn disable_peer(&mut self, _peer: PeerId) { //TODO: remove capability, disconnect if no capabilities left } -} - -struct UserTimer { - protocol: ProtocolId, - delay: u64, + /// Register a new IO timer. Returns a new timer token. 'NetworkProtocolHandler::timeout' will be called with the token. + pub fn register_timer(&mut self, ms: u64) -> Result{ + match self.io.register_timer(ms) { + Ok(token) => { + self.timers.insert(token, self.protocol.unwrap()); + Ok(token) + }, + e @ Err(_) => e, + } + } } /// Shared host information @@ -315,67 +214,40 @@ enum ConnectionEntry { } /// Root IO handler. Manages protocol handlers, IO timers and network connections. -pub struct Host { +pub struct Host where Message: Send { info: HostInfo, - _udp_socket: UdpSocket, - _listener: TcpListener, + udp_socket: UdpSocket, + listener: TcpListener, connections: Slab, - timers: Slab, + timers: HashMap, nodes: HashMap, - handlers: HashMap>, - _idle_timeout: Timeout, + handlers: HashMap>>, } -impl Host { - /// Creates a new instance and registers it with the event loop. - pub fn start(event_loop: &mut EventLoop) -> Result<(), UtilError> { +impl Host where Message: Send { + pub fn new() -> Host { let config = NetworkConfiguration::new(); - /* - match ::ifaces::Interface::get_all().unwrap().into_iter().filter(|x| x.kind == ::ifaces::Kind::Packet && x.addr.is_some()).next() { - Some(iface) => config.public_address = iface.addr.unwrap(), - None => warn!("No public network interface"), - */ - let addr = config.listen_address; // Setup the server socket let listener = TcpListener::bind(&addr).unwrap(); - // Start listening for incoming connections - event_loop.register(&listener, Token(TCP_ACCEPT), EventSet::readable(), PollOpt::edge()).unwrap(); - let idle_timeout = event_loop.timeout_ms(Token(IDLE), 1000).unwrap(); //TODO: check delay - // open the udp socket let udp_socket = UdpSocket::bound(&addr).unwrap(); - event_loop.register(&udp_socket, Token(NODETABLE_RECEIVE), EventSet::readable(), PollOpt::edge()).unwrap(); - event_loop.timeout_ms(Token(NODETABLE_MAINTAIN), 7200).unwrap(); - let port = config.listen_address.port(); - - let mut host = Host { + Host:: { info: HostInfo { keys: KeyPair::create().unwrap(), config: config, nonce: H256::random(), protocol_version: 4, client_version: "parity".to_string(), - listen_port: port, + listen_port: 0, capabilities: Vec::new(), }, - _udp_socket: udp_socket, - _listener: listener, + udp_socket: udp_socket, + listener: listener, connections: Slab::new_starting_at(Token(FIRST_CONNECTION), MAX_CONNECTIONS), - timers: Slab::new_starting_at(Token(USER_TIMER), MAX_USER_TIMERS), + timers: HashMap::new(), nodes: HashMap::new(), handlers: HashMap::new(), - _idle_timeout: idle_timeout, - }; - - host.add_node("enode://c022e7a27affdd1632f2e67dffeb87f02bf506344bb142e08d12b28e7e5c6e5dbb8183a46a77bff3631b51c12e8cf15199f797feafdc8834aaf078ad1a2bcfa0@127.0.0.1:30303"); - host.add_node("enode://5374c1bff8df923d3706357eeb4983cd29a63be40a269aaa2296ee5f3b2119a8978c0ed68b8f6fc84aad0df18790417daadf91a4bfbb786a16c9b0a199fa254a@gav.ethdev.com:30300"); - host.add_node("enode://e58d5e26b3b630496ec640f2530f3e7fa8a8c7dfe79d9e9c4aac80e3730132b869c852d3125204ab35bb1b1951f6f2d40996c1034fd8c5a69b383ee337f02ddc@gav.ethdev.com:30303"); - host.add_node("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@52.16.188.185:30303"); - host.add_node("enode://7f25d3eab333a6b98a8b5ed68d962bb22c876ffcd5561fca54e3c2ef27f754df6f7fd7c9b74cc919067abac154fb8e1f8385505954f161ae440abc355855e034@54.207.93.166:30303"); - host.add_node("enode://5374c1bff8df923d3706357eeb4983cd29a63be40a269aaa2296ee5f3b2119a8978c0ed68b8f6fc84aad0df18790417daadf91a4bfbb786a16c9b0a199fa254a@92.51.165.126:30303"); - - try!(event_loop.run(&mut host)); - Ok(()) + } } fn add_node(&mut self, id: &str) { @@ -387,8 +259,8 @@ impl Host { } } - fn maintain_network(&mut self, event_loop: &mut EventLoop) { - self.connect_peers(event_loop); + fn maintain_network(&mut self, io: IoContext>) { + self.connect_peers(io); } fn have_session(&self, id: &NodeId) -> bool { @@ -399,8 +271,7 @@ impl Host { self.connections.iter().any(|e| match e { &ConnectionEntry::Handshake(ref h) => h.id.eq(&id), _ => false }) } - fn connect_peers(&mut self, event_loop: &mut EventLoop) { - + fn connect_peers(&mut self, mut io: IoContext>) { struct NodeInfo { id: NodeId, peer_type: PeerType @@ -425,7 +296,7 @@ impl Host { for n in to_connect.iter() { if n.peer_type == PeerType::Required { if req_conn < IDEAL_PEERS { - self.connect_peer(&n.id, event_loop); + self.connect_peer(&n.id, &mut io); } req_conn += 1; } @@ -440,14 +311,14 @@ impl Host { for n in to_connect.iter() { if n.peer_type == PeerType::Optional && open_slots > 0 { open_slots -= 1; - self.connect_peer(&n.id, event_loop); + self.connect_peer(&n.id, &mut io); } } } } } - fn connect_peer(&mut self, id: &NodeId, event_loop: &mut EventLoop) { + fn connect_peer(&mut self, id: &NodeId, io: &mut IoContext>) { if self.have_session(id) { warn!("Aborted connect. Node already connected."); @@ -478,7 +349,7 @@ impl Host { match self.connections.get_mut(token) { Some(&mut ConnectionEntry::Handshake(ref mut h)) => { h.start(&self.info, true) - .and_then(|_| h.register(event_loop)) + .and_then(|_| h.register(io.event_loop)) .unwrap_or_else (|e| { debug!(target: "net", "Handshake create error: {:?}", e); }); @@ -491,23 +362,23 @@ impl Host { } - fn accept(&mut self, _event_loop: &mut EventLoop) { + fn accept(&mut self, _io: IoContext>) { trace!(target: "net", "accept"); } - fn connection_writable(&mut self, token: Token, event_loop: &mut EventLoop) { + fn connection_writable<'s>(&'s mut self, token: StreamToken, io: IoContext<'s, NetworkIoMessage>) { let mut kill = false; let mut create_session = false; - match self.connections.get_mut(token) { + match self.connections.get_mut(Token(token)) { Some(&mut ConnectionEntry::Handshake(ref mut h)) => { - h.writable(event_loop, &self.info).unwrap_or_else(|e| { + h.writable(io.event_loop, &self.info).unwrap_or_else(|e| { debug!(target: "net", "Handshake write error: {:?}", e); kill = true; }); create_session = h.done(); }, Some(&mut ConnectionEntry::Session(ref mut s)) => { - s.writable(event_loop, &self.info).unwrap_or_else(|e| { + s.writable(io.event_loop, &self.info).unwrap_or_else(|e| { debug!(target: "net", "Session write error: {:?}", e); kill = true; }); @@ -516,39 +387,41 @@ impl Host { warn!(target: "net", "Received event for unknown connection"); } } + let mut net_context = NetworkContext::new(io, None, Some(token), &mut self.connections, &mut self.timers); if kill { - self.kill_connection(token, event_loop); + Host::kill_connection(token, &mut net_context, &mut self.handlers); } if create_session { - self.start_session(token, event_loop); + Host::start_session(&self.info, token, &mut net_context); } - match self.connections.get_mut(token) { + match net_context.connections.get_mut(Token(token)) { Some(&mut ConnectionEntry::Session(ref mut s)) => { - s.reregister(event_loop).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e)); + s.reregister(net_context.io.event_loop).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e)); }, _ => (), } } - fn connection_closed(&mut self, token: Token, event_loop: &mut EventLoop) { - self.kill_connection(token, event_loop); + fn connection_closed<'s>(&'s mut self, token: TimerToken, io: IoContext<'s, NetworkIoMessage>) { + let mut net_context = NetworkContext::new(io, None, Some(token), &mut self.connections, &mut self.timers); + Host::kill_connection(token, &mut net_context, &mut self.handlers); } - fn connection_readable(&mut self, token: Token, event_loop: &mut EventLoop) { + fn connection_readable<'s>(&'s mut self, token: StreamToken, io: IoContext<'s, NetworkIoMessage>) { let mut kill = false; let mut create_session = false; let mut ready_data: Vec = Vec::new(); let mut packet_data: Option<(ProtocolId, PacketId, Vec)> = None; - match self.connections.get_mut(token) { + match self.connections.get_mut(Token(token)) { Some(&mut ConnectionEntry::Handshake(ref mut h)) => { - h.readable(event_loop, &self.info).unwrap_or_else(|e| { + h.readable(io.event_loop, &self.info).unwrap_or_else(|e| { debug!(target: "net", "Handshake read error: {:?}", e); kill = true; }); create_session = h.done(); }, Some(&mut ConnectionEntry::Session(ref mut s)) => { - let sd = { s.readable(event_loop, &self.info).unwrap_or_else(|e| { + let sd = { s.readable(io.event_loop, &self.info).unwrap_or_else(|e| { debug!(target: "net", "Session read error: {:?}", e); kill = true; SessionData::None @@ -578,32 +451,35 @@ impl Host { warn!(target: "net", "Received event for unknown connection"); } } + let mut net_context = NetworkContext::new(io, None, Some(token), &mut self.connections, &mut self.timers); if kill { - self.kill_connection(token, event_loop); + Host::kill_connection(token, &mut net_context, &mut self.handlers); } if create_session { - self.start_session(token, event_loop); + Host::start_session(&self.info, token, &mut net_context); } for p in ready_data { let mut h = self.handlers.get_mut(p).unwrap(); - h.connected(&mut HostIo::new(p, Some(token), event_loop, &mut self.connections, &mut self.timers), &token.as_usize()); + net_context.set_protocol(p); + h.connected(&mut net_context, &token); } if let Some((p, packet_id, data)) = packet_data { let mut h = self.handlers.get_mut(p).unwrap(); - h.read(&mut HostIo::new(p, Some(token), event_loop, &mut self.connections, &mut self.timers), &token.as_usize(), packet_id, &data[1..]); + net_context.set_protocol(p); + h.read(&mut net_context, &token, packet_id, &data[1..]); } - match self.connections.get_mut(token) { + match net_context.connections.get_mut(Token(token)) { Some(&mut ConnectionEntry::Session(ref mut s)) => { - s.reregister(event_loop).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e)); + s.reregister(net_context.io.event_loop).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e)); }, _ => (), } } - fn start_session(&mut self, token: Token, event_loop: &mut EventLoop) { - let info = &self.info; - self.connections.replace_with(token, |c| { + fn start_session(info: &HostInfo, token: StreamToken, io: &mut NetworkContext) { + let event_loop = &mut io.io.event_loop; + io.connections.replace_with(Token(token), |c| { match c { ConnectionEntry::Handshake(h) => Session::new(h, event_loop, info) .map(|s| Some(ConnectionEntry::Session(s))) @@ -616,16 +492,17 @@ impl Host { }).expect("Error updating slab with session"); } - fn connection_timeout(&mut self, token: Token, event_loop: &mut EventLoop) { - self.kill_connection(token, event_loop) + fn connection_timeout<'s>(&'s mut self, token: StreamToken, io: IoContext<'s, NetworkIoMessage>) { + let mut net_context = NetworkContext::new(io, None, Some(token), &mut self.connections, &mut self.timers); + Host::kill_connection(token, &mut net_context, &mut self.handlers) } - fn kill_connection(&mut self, token: Token, event_loop: &mut EventLoop) { + fn kill_connection(token: StreamToken, io: &mut NetworkContext, handlers: &mut HashMap>>) { let mut to_disconnect: Vec = Vec::new(); - match self.connections.get_mut(token) { + match io.connections.get_mut(Token(token)) { Some(&mut ConnectionEntry::Handshake(_)) => (), // just abandon handshake Some(&mut ConnectionEntry::Session(ref mut s)) if s.is_ready() => { - for (p, _) in self.handlers.iter_mut() { + for (p, _) in handlers.iter_mut() { if s.have_capability(p) { to_disconnect.push(p); } @@ -634,87 +511,103 @@ impl Host { _ => (), } for p in to_disconnect { - let mut h = self.handlers.get_mut(p).unwrap(); - h.disconnected(&mut HostIo::new(p, Some(token), event_loop, &mut self.connections, &mut self.timers), &token.as_usize()); + let mut h = handlers.get_mut(p).unwrap(); + io.set_protocol(p); + h.disconnected(io, &token); } - self.connections.remove(token); + io.connections.remove(Token(token)); } } -impl Handler for Host { - type Timeout = Token; - type Message = HostMessage; +impl IoHandler> for Host where Message: Send + 'static { + /// Initialize networking + fn initialize(&mut self, io: IoContext>) { + /* + match ::ifaces::Interface::get_all().unwrap().into_iter().filter(|x| x.kind == ::ifaces::Kind::Packet && x.addr.is_some()).next() { + Some(iface) => config.public_address = iface.addr.unwrap(), + None => warn!("No public network interface"), + */ - fn ready(&mut self, event_loop: &mut EventLoop, token: Token, events: EventSet) { - if events.is_hup() { - trace!(target: "net", "hup"); - match token.as_usize() { - FIRST_CONNECTION ... LAST_CONNECTION => self.connection_closed(token, event_loop), - _ => warn!(target: "net", "Unexpected hup"), - }; - } - else if events.is_readable() { - match token.as_usize() { - TCP_ACCEPT => self.accept(event_loop), - IDLE => self.maintain_network(event_loop), - FIRST_CONNECTION ... LAST_CONNECTION => self.connection_readable(token, event_loop), - NODETABLE_RECEIVE => {}, - _ => panic!("Received unknown readable token"), - } - } - else if events.is_writable() { - match token.as_usize() { - FIRST_CONNECTION ... LAST_CONNECTION => self.connection_writable(token, event_loop), - _ => panic!("Received unknown writable token"), - } + // Start listening for incoming connections + io.event_loop.register(&self.listener, Token(TCP_ACCEPT), EventSet::readable(), PollOpt::edge()).unwrap(); + io.event_loop.timeout_ms(Token(IDLE), 1000).unwrap(); //TODO: check delay + // open the udp socket + io.event_loop.register(&self.udp_socket, Token(NODETABLE_RECEIVE), EventSet::readable(), PollOpt::edge()).unwrap(); + io.event_loop.timeout_ms(Token(NODETABLE_MAINTAIN), 7200).unwrap(); + let port = self.info.config.listen_address.port(); + self.info.listen_port = port; + + self.add_node("enode://c022e7a27affdd1632f2e67dffeb87f02bf506344bb142e08d12b28e7e5c6e5dbb8183a46a77bff3631b51c12e8cf15199f797feafdc8834aaf078ad1a2bcfa0@127.0.0.1:30303"); + self.add_node("enode://5374c1bff8df923d3706357eeb4983cd29a63be40a269aaa2296ee5f3b2119a8978c0ed68b8f6fc84aad0df18790417daadf91a4bfbb786a16c9b0a199fa254a@gav.ethdev.com:30300"); + self.add_node("enode://e58d5e26b3b630496ec640f2530f3e7fa8a8c7dfe79d9e9c4aac80e3730132b869c852d3125204ab35bb1b1951f6f2d40996c1034fd8c5a69b383ee337f02ddc@gav.ethdev.com:30303"); + self.add_node("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@52.16.188.185:30303"); + self.add_node("enode://7f25d3eab333a6b98a8b5ed68d962bb22c876ffcd5561fca54e3c2ef27f754df6f7fd7c9b74cc919067abac154fb8e1f8385505954f161ae440abc355855e034@54.207.93.166:30303"); + self.add_node("enode://5374c1bff8df923d3706357eeb4983cd29a63be40a269aaa2296ee5f3b2119a8978c0ed68b8f6fc84aad0df18790417daadf91a4bfbb786a16c9b0a199fa254a@92.51.165.126:30303"); + } + + fn stream_hup(&mut self, io: IoContext>, stream: StreamToken) { + trace!(target: "net", "Hup: {}", stream); + match stream { + FIRST_CONNECTION ... LAST_CONNECTION => self.connection_closed(stream, io), + _ => warn!(target: "net", "Unexpected hup"), + }; + } + + fn stream_readable(&mut self, io: IoContext>, stream: StreamToken) { + match stream { + TCP_ACCEPT => self.accept(io), + FIRST_CONNECTION ... LAST_CONNECTION => self.connection_readable(stream, io), + NODETABLE_RECEIVE => {}, + _ => panic!("Received unknown readable token"), } } - fn timeout(&mut self, event_loop: &mut EventLoop, token: Token) { - match token.as_usize() { - IDLE => self.maintain_network(event_loop), - FIRST_CONNECTION ... LAST_CONNECTION => self.connection_timeout(token, event_loop), + fn stream_writable(&mut self, io: IoContext>, stream: StreamToken) { + match stream { + FIRST_CONNECTION ... LAST_CONNECTION => self.connection_writable(stream, io), + _ => panic!("Received unknown writable token"), + } + } + + fn timeout(&mut self, io: IoContext>, token: TimerToken) { + match token { + IDLE => self.maintain_network(io), + FIRST_CONNECTION ... LAST_CONNECTION => self.connection_timeout(token, io), NODETABLE_DISCOVERY => {}, NODETABLE_MAINTAIN => {}, - USER_TIMER ... LAST_USER_TIMER => { - let (protocol, delay) = { - let timer = self.timers.get_mut(token).expect("Unknown user timer token"); - (timer.protocol, timer.delay) - }; + _ => { + let protocol: ProtocolId = self.timers.get_mut(&token).expect("Unknown user timer token"); match self.handlers.get_mut(protocol) { None => { warn!(target: "net", "No handler found for protocol: {:?}", protocol) }, Some(h) => { - h.timeout(&mut HostIo::new(protocol, None, event_loop, &mut self.connections, &mut self.timers), token.as_usize()); - event_loop.timeout_ms(token, delay).expect("Error re-registering user timer"); + h.timeout(&mut NetworkContext::new(io, Some(protocol), Some(token), &mut self.connections, &mut self.timers), token); } } } - _ => panic!("Unknown timer token"), } } - fn notify(&mut self, event_loop: &mut EventLoop, msg: Self::Message) { - match msg { - HostMessage::Shutdown => event_loop.shutdown(), - HostMessage::AddHandler { - handler, - protocol, - versions + fn message(&mut self, io: IoContext>, message: &mut NetworkIoMessage) { + match message { + &mut NetworkIoMessage::AddHandler { + ref mut handler, + ref protocol, + ref versions } => { - self.handlers.insert(protocol, handler); + self.handlers.insert(protocol, mem::replace(handler, None).unwrap()); for v in versions { - self.info.capabilities.push(CapabilityInfo { protocol: protocol, version: v, packet_count:0 }); + self.info.capabilities.push(CapabilityInfo { protocol: protocol, version: *v, packet_count:0 }); } }, - HostMessage::Send { - peer, - packet_id, - protocol, - data, + &mut NetworkIoMessage::Send { + ref peer, + ref packet_id, + ref protocol, + ref data, } => { - match self.connections.get_mut(Token(peer as usize)) { + match self.connections.get_mut(Token(*peer as usize)) { Some(&mut ConnectionEntry::Session(ref mut s)) => { - s.send_packet(protocol, packet_id as u8, &data).unwrap_or_else(|e| { + s.send_packet(protocol, *packet_id as u8, &data).unwrap_or_else(|e| { warn!(target: "net", "Send error: {:?}", e); }); //TODO: don't copy vector data }, @@ -723,10 +616,15 @@ impl Handler for Host { } } }, - HostMessage::UserMessage(message) => { + &mut NetworkIoMessage::User { + ref protocol, + ref message + } => { + let mut net_context = NetworkContext::new(io, None, None, &mut self.connections, &mut self.timers); for (p, h) in self.handlers.iter_mut() { - if p != &message.protocol { - h.message(&mut HostIo::new(message.protocol, None, event_loop, &mut self.connections, &mut self.timers), &message); + if p != protocol { + net_context.set_protocol(p); + h.message(&mut net_context, &message); } } } diff --git a/src/network/mod.rs b/src/network/mod.rs index cdce08d00..1a1450cb6 100644 --- a/src/network/mod.rs +++ b/src/network/mod.rs @@ -9,27 +9,27 @@ /// struct MyHandler; /// /// impl ProtocolHandler for MyHandler { -/// fn initialize(&mut self, io: &mut HandlerIo) { +/// fn initialize(&mut self, io: &mut NetworkContext) { /// io.register_timer(1000); /// } /// -/// fn read(&mut self, io: &mut HandlerIo, peer: &PeerId, packet_id: u8, data: &[u8]) { +/// fn read(&mut self, io: &mut NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { /// println!("Received {} ({} bytes) from {}", packet_id, data.len(), peer); /// } /// -/// fn connected(&mut self, io: &mut HandlerIo, peer: &PeerId) { +/// fn connected(&mut self, io: &mut NetworkContext, peer: &PeerId) { /// println!("Connected {}", peer); /// } /// -/// fn disconnected(&mut self, io: &mut HandlerIo, peer: &PeerId) { +/// fn disconnected(&mut self, io: &mut NetworkContext, peer: &PeerId) { /// println!("Disconnected {}", peer); /// } /// -/// fn timeout(&mut self, io: &mut HandlerIo, timer: TimerToken) { +/// fn timeout(&mut self, io: &mut NetworkContext, timer: TimerToken) { /// println!("Timeout {}", timer); /// } /// -/// fn message(&mut self, io: &mut HandlerIo, message: &Message) { +/// fn message(&mut self, io: &mut NetworkContext, message: &Message) { /// println!("Message {}:{}", message.protocol, message.id); /// } /// } @@ -50,69 +50,31 @@ mod handshake; mod session; mod discovery; mod service; - -#[derive(Debug, Copy, Clone)] -pub enum DisconnectReason -{ - DisconnectRequested, - TCPError, - BadProtocol, - UselessPeer, - TooManyPeers, - DuplicatePeer, - IncompatibleProtocol, - NullIdentity, - ClientQuit, - UnexpectedIdentity, - LocalIdentity, - PingTimeout, -} - -#[derive(Debug)] -pub enum NetworkError { - Auth, - BadProtocol, - PeerNotFound, - Disconnect(DisconnectReason), - Mio(::std::io::Error), -} - -impl From<::rlp::DecoderError> for NetworkError { - fn from(_err: ::rlp::DecoderError) -> NetworkError { - NetworkError::Auth - } -} - -impl From<::mio::NotifyError> for NetworkError { - fn from(_err: ::mio::NotifyError) -> NetworkError { - NetworkError::Mio(::std::io::Error::new(::std::io::ErrorKind::ConnectionAborted, "Network IO notification error")) - } -} +mod error; +mod node; pub type PeerId = host::PeerId; pub type PacketId = host::PacketId; -pub type TimerToken = host::TimerToken; -pub type HandlerIo<'s> = host::HostIo<'s>; -pub type Message = host::UserMessage; -pub type MessageId = host::UserMessageId; +pub type NetworkContext<'s, Message> = host::NetworkContext<'s, Message>; +pub type NetworkService = service::NetworkService; +pub type NetworkIoMessage = host::NetworkIoMessage; +pub type NetworkError = error::NetworkError; + +use io::*; /// Network IO protocol handler. This needs to be implemented for each new subprotocol. -/// TODO: Separate p2p networking IO from IPC IO. `timeout` and `message` should go to a more genera IO provider. /// All the handler function are called from within IO event loop. -pub trait ProtocolHandler: Send { - /// Initialize the hadler - fn initialize(&mut self, io: &mut HandlerIo); +/// `Message` is the type for message data. +pub trait NetworkProtocolHandler: Send where Message: Send { /// Called when new network packet received. - fn read(&mut self, io: &mut HandlerIo, peer: &PeerId, packet_id: u8, data: &[u8]); + fn read(&mut self, io: &mut NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]); /// Called when new peer is connected. Only called when peer supports the same protocol. - fn connected(&mut self, io: &mut HandlerIo, peer: &PeerId); + fn connected(&mut self, io: &mut NetworkContext, peer: &PeerId); /// Called when a previously connected peer disconnects. - fn disconnected(&mut self, io: &mut HandlerIo, peer: &PeerId); - /// Timer function called after a timeout created with `HandlerIo::timeout`. - fn timeout(&mut self, io: &mut HandlerIo, timer: TimerToken); - /// Called when a broadcasted message is received. The message can only be sent from a different protocol handler. - fn message(&mut self, io: &mut HandlerIo, message: &Message); + fn disconnected(&mut self, io: &mut NetworkContext, peer: &PeerId); + /// Timer function called after a timeout created with `NetworkContext::timeout`. + fn timeout(&mut self, _io: &mut NetworkContext, _timer: TimerToken) {} + /// Called when a broadcasted message is received. The message can only be sent from a different IO handler. + fn message(&mut self, _io: &mut NetworkContext, _message: &Message) {} } -pub type NetworkService = service::NetworkService; - diff --git a/src/network/service.rs b/src/network/service.rs index 23cbb57d9..be70e2ee1 100644 --- a/src/network/service.rs +++ b/src/network/service.rs @@ -1,32 +1,28 @@ -use std::thread::{self, JoinHandle}; -use mio::*; use error::*; -use network::{NetworkError, ProtocolHandler}; -use network::host::{Host, HostMessage, PeerId, PacketId, ProtocolId}; +use network::{NetworkProtocolHandler}; +use network::error::{NetworkError}; +use network::host::{Host, NetworkIoMessage, PeerId, PacketId, ProtocolId}; +use io::*; /// IO Service with networking -pub struct NetworkService { - thread: Option>, - host_channel: Sender +/// `Message` defines a notification data type. +pub struct NetworkService where Message: Send + 'static { + io_service: IoService>, } -impl NetworkService { +impl NetworkService where Message: Send + 'static { /// Starts IO event loop - pub fn start() -> Result { - let mut event_loop = EventLoop::new().unwrap(); - let channel = event_loop.channel(); - let thread = thread::spawn(move || { - Host::start(&mut event_loop).unwrap(); //TODO: - }); + pub fn start() -> Result, UtilError> { + let mut io_service = try!(IoService::>::start()); + try!(io_service.register_handler(Box::new(Host::new()))); Ok(NetworkService { - thread: Some(thread), - host_channel: channel + io_service: io_service }) } /// Send a message over the network. Normaly `HostIo::send` should be used. This can be used from non-io threads. pub fn send(&mut self, peer: &PeerId, packet_id: PacketId, protocol: ProtocolId, data: &[u8]) -> Result<(), NetworkError> { - try!(self.host_channel.send(HostMessage::Send { + try!(self.io_service.send_message(NetworkIoMessage::Send { peer: *peer, packet_id: packet_id, protocol: protocol, @@ -36,9 +32,9 @@ impl NetworkService { } /// Regiter a new protocol handler with the event loop. - pub fn register_protocol(&mut self, handler: Box, protocol: ProtocolId, versions: &[u8]) -> Result<(), NetworkError> { - try!(self.host_channel.send(HostMessage::AddHandler { - handler: handler, + pub fn register_protocol(&mut self, handler: Box+Send>, protocol: ProtocolId, versions: &[u8]) -> Result<(), NetworkError> { + try!(self.io_service.send_message(NetworkIoMessage::AddHandler { + handler: Some(handler), protocol: protocol, versions: versions.to_vec(), })); @@ -46,10 +42,3 @@ impl NetworkService { } } -impl Drop for NetworkService { - fn drop(&mut self) { - self.host_channel.send(HostMessage::Shutdown).unwrap(); - self.thread.take().unwrap().join().unwrap(); - } -} - diff --git a/src/network/session.rs b/src/network/session.rs index 96390d366..2d8ba2245 100644 --- a/src/network/session.rs +++ b/src/network/session.rs @@ -4,8 +4,9 @@ use rlp::*; use network::connection::{EncryptedConnection, Packet}; use network::handshake::Handshake; use error::*; -use network::{NetworkError, DisconnectReason}; +use network::error::{NetworkError, DisconnectReason}; use network::host::*; +use network::node::NodeId; /// Peer session over encrypted connection. /// When created waits for Hello packet exchange and signals ready state. @@ -83,7 +84,7 @@ const PACKET_LAST: u8 = 0x7f; impl Session { /// Create a new session out of comepleted handshake. Consumes handshake object. - pub fn new(h: Handshake, event_loop: &mut EventLoop, host: &HostInfo) -> Result { + pub fn new>(h: Handshake, event_loop: &mut EventLoop, host: &HostInfo) -> Result { let id = h.id.clone(); let connection = try!(EncryptedConnection::new(h)); let mut session = Session { @@ -108,7 +109,7 @@ impl Session { } /// Readable IO handler. Returns packet data if available. - pub fn readable(&mut self, event_loop: &mut EventLoop, host: &HostInfo) -> Result { + pub fn readable(&mut self, event_loop: &mut EventLoop, host: &HostInfo) -> Result { match try!(self.connection.readable(event_loop)) { Some(data) => Ok(try!(self.read_packet(data, host))), None => Ok(SessionData::None) @@ -116,7 +117,7 @@ impl Session { } /// Writable IO handler. Sends pending packets. - pub fn writable(&mut self, event_loop: &mut EventLoop, _host: &HostInfo) -> Result<(), UtilError> { + pub fn writable(&mut self, event_loop: &mut EventLoop, _host: &HostInfo) -> Result<(), UtilError> { self.connection.writable(event_loop) } @@ -126,7 +127,7 @@ impl Session { } /// Update registration with the event loop. Should be called at the end of the IO handler. - pub fn reregister(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { + pub fn reregister(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { self.connection.reregister(event_loop) } From 28c482691a6e2b95c80d120d82c1fc693f30b427 Mon Sep 17 00:00:00 2001 From: arkpar Date: Wed, 13 Jan 2016 13:56:48 +0100 Subject: [PATCH 03/13] Fixed context lifetimes --- src/io/mod.rs | 87 +++++++++++++++++++------------ src/io/service.rs | 58 ++++++++++----------- src/lib.rs | 1 + src/network/host.rs | 122 ++++++++++++++++++++------------------------ src/network/mod.rs | 29 ++++++----- 5 files changed, 158 insertions(+), 139 deletions(-) diff --git a/src/io/mod.rs b/src/io/mod.rs index 72906b6f4..497e1ae78 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -3,47 +3,38 @@ /// Example usage for craeting a network service and adding an IO handler: /// /// ```rust -/// extern crate ethcore_util as util; -/// use util::network::*; +/// extern crate ethcore_util; +/// use ethcore_util::*; /// /// struct MyHandler; /// -/// impl ProtocolHandler for MyHandler { -/// fn initialize(&mut self, io: &mut HandlerIo) { -/// io.register_timer(1000); +/// struct MyMessage { +/// data: u32 +/// } +/// +/// impl IoHandler for MyHandler { +/// fn initialize(&mut self, io: &mut IoContext) { +/// io.register_timer(1000).unwrap(); /// } /// -/// fn read(&mut self, io: &mut HandlerIo, peer: &PeerId, packet_id: u8, data: &[u8]) { -/// println!("Received {} ({} bytes) from {}", packet_id, data.len(), peer); -/// } -/// -/// fn connected(&mut self, io: &mut HandlerIo, peer: &PeerId) { -/// println!("Connected {}", peer); -/// } -/// -/// fn disconnected(&mut self, io: &mut HandlerIo, peer: &PeerId) { -/// println!("Disconnected {}", peer); -/// } -/// -/// fn timeout(&mut self, io: &mut HandlerIo, timer: TimerToken) { +/// fn timeout(&mut self, _io: &mut IoContext, timer: TimerToken) { /// println!("Timeout {}", timer); /// } /// -/// fn message(&mut self, io: &mut HandlerIo, message: &Message) { -/// println!("Message {}:{}", message.protocol, message.id); +/// fn message(&mut self, _io: &mut IoContext, message: &mut MyMessage) { +/// println!("Message {}", message.data); /// } -/// } +/// } /// /// fn main () { -/// let mut service = NetworkService::start().expect("Error creating network service"); -/// service.register_protocol(Box::new(MyHandler), "myproto", &[1u8]); +/// let mut service = IoService::::start().expect("Error creating network service"); +/// service.register_handler(Box::new(MyHandler)).unwrap(); /// /// // Wait for quit condition /// // ... /// // Drop the service /// } /// ``` -extern crate mio; mod service; #[derive(Debug)] @@ -51,8 +42,8 @@ pub enum IoError { Mio(::std::io::Error), } -impl From<::mio::NotifyError>> for IoError where M: Send { - fn from(_err: ::mio::NotifyError>) -> IoError { +impl From<::mio::NotifyError>> for IoError where Message: Send { + fn from(_err: ::mio::NotifyError>) -> IoError { IoError::Mio(::std::io::Error::new(::std::io::ErrorKind::ConnectionAborted, "Network IO notification error")) } } @@ -62,17 +53,17 @@ impl From<::mio::NotifyError>> for IoError where M: Sen /// `Message` type is used as notification data pub trait IoHandler: Send where Message: Send + 'static { /// Initialize the handler - fn initialize(&mut self, _io: IoContext) {} + fn initialize<'s>(&'s mut self, _io: &mut IoContext<'s, Message>) {} /// Timer function called after a timeout created with `HandlerIo::timeout`. - fn timeout(&mut self, _io: IoContext, _timer: TimerToken) {} + fn timeout<'s>(&'s mut self, _io: &mut IoContext<'s, Message>, _timer: TimerToken) {} /// Called when a broadcasted message is received. The message can only be sent from a different IO handler. - fn message(&mut self, _io: IoContext, _message: &mut Message) {} // TODO: make message immutable and provide internal channel for adding network handler + fn message<'s>(&'s mut self, _io: &mut IoContext<'s, Message>, _message: &'s mut Message) {} // TODO: make message immutable and provide internal channel for adding network handler /// Called when an IO stream gets closed - fn stream_hup(&mut self, _io: IoContext, _stream: StreamToken) {} + fn stream_hup<'s>(&'s mut self, _io: &mut IoContext<'s, Message>, _stream: StreamToken) {} /// Called when an IO stream can be read from - fn stream_readable(&mut self, _io: IoContext, _stream: StreamToken) {} + fn stream_readable<'s>(&'s mut self, _io: &mut IoContext<'s, Message>, _stream: StreamToken) {} /// Called when an IO stream can be written to - fn stream_writable(&mut self, _io: IoContext, _stream: StreamToken) {} + fn stream_writable<'s>(&'s mut self, _io: &mut IoContext<'s, Message>, _stream: StreamToken) {} } pub type TimerToken = service::TimerToken; @@ -83,3 +74,35 @@ pub const USER_TOKEN_START: usize = service::USER_TOKEN; +#[cfg(test)] +mod tests { + + use io::*; + + struct MyHandler; + + struct MyMessage { + data: u32 + } + + impl IoHandler for MyHandler { + fn initialize(&mut self, io: &mut IoContext) { + io.register_timer(1000).unwrap(); + } + + fn timeout(&mut self, _io: &mut IoContext, timer: TimerToken) { + println!("Timeout {}", timer); + } + + fn message(&mut self, _io: &mut IoContext, message: &mut MyMessage) { + println!("Message {}", message.data); + } + } + + #[test] + fn test_service_register_handler () { + let mut service = IoService::::start().expect("Error creating network service"); + service.register_handler(Box::new(MyHandler)).unwrap(); + } + +} diff --git a/src/io/service.rs b/src/io/service.rs index 66814ba3c..aab8f5c2d 100644 --- a/src/io/service.rs +++ b/src/io/service.rs @@ -16,26 +16,26 @@ const LAST_USER_TIMER: usize = USER_TIMER + MAX_USER_TIMERS - 1; pub const USER_TOKEN: usize = LAST_USER_TIMER + 1; /// Messages used to communicate with the event loop from other threads. -pub enum IoMessage where M: Send + Sized { +pub enum IoMessage where Message: Send + Sized { /// Shutdown the event loop Shutdown, /// Register a new protocol handler. AddHandler { - handler: Box+Send>, + handler: Box+Send>, }, /// Broadcast a message across all protocol handlers. - UserMessage(M) + UserMessage(Message) } /// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem. -pub struct IoContext<'s, M> where M: Send + 'static { +pub struct IoContext<'s, Message> where Message: Send + 'static { timers: &'s mut Slab, - pub event_loop: &'s mut EventLoop>, + pub event_loop: &'s mut EventLoop>, } -impl<'s, M> IoContext<'s, M> where M: Send + 'static { +impl<'s, Message> IoContext<'s, Message> where Message: Send + 'static { /// Create a new IO access point. Takes references to all the data that can be updated within the IO handler. - fn new(event_loop: &'s mut EventLoop>, timers: &'s mut Slab) -> IoContext<'s, M> { + fn new(event_loop: &'s mut EventLoop>, timers: &'s mut Slab) -> IoContext<'s, Message> { IoContext { event_loop: event_loop, timers: timers, @@ -56,7 +56,7 @@ impl<'s, M> IoContext<'s, M> where M: Send + 'static { } /// Broadcast a message to other IO clients - pub fn message(&mut self, message: M) { + pub fn message(&mut self, message: Message) { match self.event_loop.channel().send(IoMessage::UserMessage(message)) { Ok(_) => {} Err(e) => { panic!("Error sending io message {:?}", e); } @@ -69,14 +69,14 @@ struct UserTimer { } /// Root IO handler. Manages user handlers, messages and IO timers. -pub struct IoManager where M: Send { +pub struct IoManager where Message: Send { timers: Slab, - handlers: Vec>>, + handlers: Vec>>, } -impl IoManager where M: Send + 'static { +impl IoManager where Message: Send + 'static { /// Creates a new instance and registers it with the event loop. - pub fn start(event_loop: &mut EventLoop>) -> Result<(), UtilError> { + pub fn start(event_loop: &mut EventLoop>) -> Result<(), UtilError> { let mut io = IoManager { timers: Slab::new_starting_at(Token(USER_TIMER), MAX_USER_TIMERS), handlers: Vec::new(), @@ -86,24 +86,24 @@ impl IoManager where M: Send + 'static { } } -impl Handler for IoManager where M: Send + 'static { +impl Handler for IoManager where Message: Send + 'static { type Timeout = Token; - type Message = IoMessage; + type Message = IoMessage; fn ready(&mut self, event_loop: &mut EventLoop, token: Token, events: EventSet) { if events.is_hup() { for h in self.handlers.iter_mut() { - h.stream_hup(IoContext::new(event_loop, &mut self.timers), token.as_usize()); + h.stream_hup(&mut IoContext::new(event_loop, &mut self.timers), token.as_usize()); } } else if events.is_readable() { for h in self.handlers.iter_mut() { - h.stream_readable(IoContext::new(event_loop, &mut self.timers), token.as_usize()); + h.stream_readable(&mut IoContext::new(event_loop, &mut self.timers), token.as_usize()); } } else if events.is_writable() { for h in self.handlers.iter_mut() { - h.stream_writable(IoContext::new(event_loop, &mut self.timers), token.as_usize()); + h.stream_writable(&mut IoContext::new(event_loop, &mut self.timers), token.as_usize()); } } } @@ -116,13 +116,13 @@ impl Handler for IoManager where M: Send + 'static { timer.delay }; for h in self.handlers.iter_mut() { - h.timeout(IoContext::new(event_loop, &mut self.timers), token.as_usize()); + h.timeout(&mut IoContext::new(event_loop, &mut self.timers), token.as_usize()); } event_loop.timeout_ms(token, delay).expect("Error re-registering user timer"); } _ => { // Just pass the event down. IoHandler is supposed to re-register it if required. for h in self.handlers.iter_mut() { - h.timeout(IoContext::new(event_loop, &mut self.timers), token.as_usize()); + h.timeout(&mut IoContext::new(event_loop, &mut self.timers), token.as_usize()); } } } @@ -139,7 +139,7 @@ impl Handler for IoManager where M: Send + 'static { }, IoMessage::UserMessage(ref mut data) => { for h in self.handlers.iter_mut() { - h.message(IoContext::new(event_loop, &mut self.timers), data); + h.message(&mut IoContext::new(event_loop, &mut self.timers), data); } } } @@ -147,19 +147,19 @@ impl Handler for IoManager where M: Send + 'static { } /// General IO Service. Starts an event loop and dispatches IO requests. -/// 'M' is a notification message type -pub struct IoService where M: Send + 'static { +/// 'Message' is a notification message type +pub struct IoService where Message: Send + 'static { thread: Option>, - host_channel: Sender> + host_channel: Sender> } -impl IoService where M: Send + 'static { +impl IoService where Message: Send + 'static { /// Starts IO event loop - pub fn start() -> Result, UtilError> { + pub fn start() -> Result, UtilError> { let mut event_loop = EventLoop::new().unwrap(); let channel = event_loop.channel(); let thread = thread::spawn(move || { - IoManager::::start(&mut event_loop).unwrap(); //TODO: + IoManager::::start(&mut event_loop).unwrap(); //TODO: }); Ok(IoService { thread: Some(thread), @@ -168,7 +168,7 @@ impl IoService where M: Send + 'static { } /// Regiter a IO hadnler with the event loop. - pub fn register_handler(&mut self, handler: Box+Send>) -> Result<(), IoError> { + pub fn register_handler(&mut self, handler: Box+Send>) -> Result<(), IoError> { try!(self.host_channel.send(IoMessage::AddHandler { handler: handler, })); @@ -176,13 +176,13 @@ impl IoService where M: Send + 'static { } /// Send a message over the network. Normaly `HostIo::send` should be used. This can be used from non-io threads. - pub fn send_message(&mut self, message: M) -> Result<(), IoError> { + pub fn send_message(&mut self, message: Message) -> Result<(), IoError> { try!(self.host_channel.send(IoMessage::UserMessage(message))); Ok(()) } } -impl Drop for IoService where M: Send { +impl Drop for IoService where Message: Send { fn drop(&mut self) { self.host_channel.send(IoMessage::Shutdown).unwrap(); self.thread.take().unwrap().join().unwrap(); diff --git a/src/lib.rs b/src/lib.rs index acd96ee5f..fdab5e92a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -89,3 +89,4 @@ pub use heapsizeof::*; pub use squeeze::*; pub use semantic_version::*; pub use network::*; +pub use io::*; diff --git a/src/network/host.rs b/src/network/host.rs index d61ef614c..8917b1f8e 100644 --- a/src/network/host.rs +++ b/src/network/host.rs @@ -1,5 +1,5 @@ use std::mem; -use std::net::{SocketAddr }; +use std::net::{SocketAddr}; use std::collections::{HashMap}; use std::hash::{Hasher}; use std::str::{FromStr}; @@ -103,20 +103,20 @@ impl Encodable for CapabilityInfo { } /// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem. -pub struct NetworkContext<'s, Message> where Message: Send + 'static { - io: IoContext<'s, NetworkIoMessage>, - protocol: Option, +pub struct NetworkContext<'s, 'io, Message> where Message: Send + 'static, 'io: 's { + io: &'s mut IoContext<'io, NetworkIoMessage>, + protocol: ProtocolId, connections: &'s mut Slab, timers: &'s mut HashMap, session: Option, } -impl<'s, Message> NetworkContext<'s, Message> where Message: Send + 'static, { +impl<'s, 'io, Message> NetworkContext<'s, 'io, Message> where Message: Send + 'static, { /// Create a new network IO access point. Takes references to all the data that can be updated within the IO handler. - fn new(io: IoContext<'s, NetworkIoMessage>, - protocol: Option, + fn new(io: &'s mut IoContext<'io, NetworkIoMessage>, + protocol: ProtocolId, session: Option, connections: &'s mut Slab, - timers: &'s mut HashMap) -> NetworkContext<'s, Message> { + timers: &'s mut HashMap) -> NetworkContext<'s, 'io, Message> { NetworkContext { io: io, protocol: protocol, @@ -126,15 +126,11 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + 'static, { } } - fn set_protocol(&mut self, protocol: ProtocolId) { - self.protocol = Some(protocol); - } - /// Send a packet over the network to another peer. pub fn send(&mut self, peer: PeerId, packet_id: PacketId, data: Vec) -> Result<(), UtilError> { match self.connections.get_mut(Token(peer)) { Some(&mut ConnectionEntry::Session(ref mut s)) => { - s.send_packet(self.protocol.unwrap(), packet_id as u8, &data).unwrap_or_else(|e| { + s.send_packet(self.protocol, packet_id as u8, &data).unwrap_or_else(|e| { warn!(target: "net", "Send error: {:?}", e); }); //TODO: don't copy vector data }, @@ -164,7 +160,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + 'static, { pub fn register_timer(&mut self, ms: u64) -> Result{ match self.io.register_timer(ms) { Ok(token) => { - self.timers.insert(token, self.protocol.unwrap()); + self.timers.insert(token, self.protocol); Ok(token) }, e @ Err(_) => e, @@ -259,7 +255,7 @@ impl Host where Message: Send { } } - fn maintain_network(&mut self, io: IoContext>) { + fn maintain_network(&mut self, io: &mut IoContext>) { self.connect_peers(io); } @@ -271,7 +267,7 @@ impl Host where Message: Send { self.connections.iter().any(|e| match e { &ConnectionEntry::Handshake(ref h) => h.id.eq(&id), _ => false }) } - fn connect_peers(&mut self, mut io: IoContext>) { + fn connect_peers(&mut self, io: &mut IoContext>) { struct NodeInfo { id: NodeId, peer_type: PeerType @@ -296,7 +292,7 @@ impl Host where Message: Send { for n in to_connect.iter() { if n.peer_type == PeerType::Required { if req_conn < IDEAL_PEERS { - self.connect_peer(&n.id, &mut io); + self.connect_peer(&n.id, io); } req_conn += 1; } @@ -311,7 +307,7 @@ impl Host where Message: Send { for n in to_connect.iter() { if n.peer_type == PeerType::Optional && open_slots > 0 { open_slots -= 1; - self.connect_peer(&n.id, &mut io); + self.connect_peer(&n.id, io); } } } @@ -362,11 +358,11 @@ impl Host where Message: Send { } - fn accept(&mut self, _io: IoContext>) { + fn accept(&mut self, _io: &mut IoContext>) { trace!(target: "net", "accept"); } - fn connection_writable<'s>(&'s mut self, token: StreamToken, io: IoContext<'s, NetworkIoMessage>) { + fn connection_writable<'s>(&'s mut self, token: StreamToken, io: &mut IoContext<'s, NetworkIoMessage>) { let mut kill = false; let mut create_session = false; match self.connections.get_mut(Token(token)) { @@ -387,27 +383,25 @@ impl Host where Message: Send { warn!(target: "net", "Received event for unknown connection"); } } - let mut net_context = NetworkContext::new(io, None, Some(token), &mut self.connections, &mut self.timers); if kill { - Host::kill_connection(token, &mut net_context, &mut self.handlers); + self.kill_connection(token, io); + return; + } else if create_session { + self.start_session(token, io); } - if create_session { - Host::start_session(&self.info, token, &mut net_context); - } - match net_context.connections.get_mut(Token(token)) { + match self.connections.get_mut(Token(token)) { Some(&mut ConnectionEntry::Session(ref mut s)) => { - s.reregister(net_context.io.event_loop).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e)); + s.reregister(io.event_loop).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e)); }, _ => (), } } - fn connection_closed<'s>(&'s mut self, token: TimerToken, io: IoContext<'s, NetworkIoMessage>) { - let mut net_context = NetworkContext::new(io, None, Some(token), &mut self.connections, &mut self.timers); - Host::kill_connection(token, &mut net_context, &mut self.handlers); + fn connection_closed<'s>(&'s mut self, token: TimerToken, io: &mut IoContext<'s, NetworkIoMessage>) { + self.kill_connection(token, io); } - fn connection_readable<'s>(&'s mut self, token: StreamToken, io: IoContext<'s, NetworkIoMessage>) { + fn connection_readable<'s>(&'s mut self, token: StreamToken, io: &mut IoContext<'s, NetworkIoMessage>) { let mut kill = false; let mut create_session = false; let mut ready_data: Vec = Vec::new(); @@ -451,37 +445,35 @@ impl Host where Message: Send { warn!(target: "net", "Received event for unknown connection"); } } - let mut net_context = NetworkContext::new(io, None, Some(token), &mut self.connections, &mut self.timers); if kill { - Host::kill_connection(token, &mut net_context, &mut self.handlers); + self.kill_connection(token, io); + return; } if create_session { - Host::start_session(&self.info, token, &mut net_context); + self.start_session(token, io); } for p in ready_data { let mut h = self.handlers.get_mut(p).unwrap(); - net_context.set_protocol(p); - h.connected(&mut net_context, &token); + h.connected(&mut NetworkContext::new(io, p, Some(token), &mut self.connections, &mut self.timers), &token); } if let Some((p, packet_id, data)) = packet_data { let mut h = self.handlers.get_mut(p).unwrap(); - net_context.set_protocol(p); - h.read(&mut net_context, &token, packet_id, &data[1..]); + h.read(&mut NetworkContext::new(io, p, Some(token), &mut self.connections, &mut self.timers), &token, packet_id, &data[1..]); } - match net_context.connections.get_mut(Token(token)) { + match self.connections.get_mut(Token(token)) { Some(&mut ConnectionEntry::Session(ref mut s)) => { - s.reregister(net_context.io.event_loop).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e)); + s.reregister(io.event_loop).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e)); }, _ => (), } } - fn start_session(info: &HostInfo, token: StreamToken, io: &mut NetworkContext) { - let event_loop = &mut io.io.event_loop; - io.connections.replace_with(Token(token), |c| { + fn start_session(&mut self, token: StreamToken, io: &mut IoContext>) { + let info = &self.info; + self.connections.replace_with(Token(token), |c| { match c { - ConnectionEntry::Handshake(h) => Session::new(h, event_loop, info) + ConnectionEntry::Handshake(h) => Session::new(h, io.event_loop, info) .map(|s| Some(ConnectionEntry::Session(s))) .unwrap_or_else(|e| { debug!(target: "net", "Session construction error: {:?}", e); @@ -492,17 +484,16 @@ impl Host where Message: Send { }).expect("Error updating slab with session"); } - fn connection_timeout<'s>(&'s mut self, token: StreamToken, io: IoContext<'s, NetworkIoMessage>) { - let mut net_context = NetworkContext::new(io, None, Some(token), &mut self.connections, &mut self.timers); - Host::kill_connection(token, &mut net_context, &mut self.handlers) + fn connection_timeout<'s>(&'s mut self, token: StreamToken, io: &mut IoContext<'s, NetworkIoMessage>) { + self.kill_connection(token, io) } - fn kill_connection(token: StreamToken, io: &mut NetworkContext, handlers: &mut HashMap>>) { + fn kill_connection<'s>(&'s mut self, token: StreamToken, io: &mut IoContext<'s, NetworkIoMessage>) { let mut to_disconnect: Vec = Vec::new(); - match io.connections.get_mut(Token(token)) { + match self.connections.get_mut(Token(token)) { Some(&mut ConnectionEntry::Handshake(_)) => (), // just abandon handshake Some(&mut ConnectionEntry::Session(ref mut s)) if s.is_ready() => { - for (p, _) in handlers.iter_mut() { + for (p, _) in self.handlers.iter_mut() { if s.have_capability(p) { to_disconnect.push(p); } @@ -511,17 +502,16 @@ impl Host where Message: Send { _ => (), } for p in to_disconnect { - let mut h = handlers.get_mut(p).unwrap(); - io.set_protocol(p); - h.disconnected(io, &token); + let mut h = self.handlers.get_mut(p).unwrap(); + h.disconnected(&mut NetworkContext::new(io, p, Some(token), &mut self.connections, &mut self.timers), &token); } - io.connections.remove(Token(token)); + self.connections.remove(Token(token)); } } impl IoHandler> for Host where Message: Send + 'static { /// Initialize networking - fn initialize(&mut self, io: IoContext>) { + fn initialize(&mut self, io: &mut IoContext>) { /* match ::ifaces::Interface::get_all().unwrap().into_iter().filter(|x| x.kind == ::ifaces::Kind::Packet && x.addr.is_some()).next() { Some(iface) => config.public_address = iface.addr.unwrap(), @@ -545,7 +535,7 @@ impl IoHandler> for Host where Messa self.add_node("enode://5374c1bff8df923d3706357eeb4983cd29a63be40a269aaa2296ee5f3b2119a8978c0ed68b8f6fc84aad0df18790417daadf91a4bfbb786a16c9b0a199fa254a@92.51.165.126:30303"); } - fn stream_hup(&mut self, io: IoContext>, stream: StreamToken) { + fn stream_hup<'s>(&'s mut self, io: &mut IoContext<'s, NetworkIoMessage>, stream: StreamToken) { trace!(target: "net", "Hup: {}", stream); match stream { FIRST_CONNECTION ... LAST_CONNECTION => self.connection_closed(stream, io), @@ -553,7 +543,7 @@ impl IoHandler> for Host where Messa }; } - fn stream_readable(&mut self, io: IoContext>, stream: StreamToken) { + fn stream_readable<'s>(&'s mut self, io: &mut IoContext<'s, NetworkIoMessage>, stream: StreamToken) { match stream { TCP_ACCEPT => self.accept(io), FIRST_CONNECTION ... LAST_CONNECTION => self.connection_readable(stream, io), @@ -562,39 +552,41 @@ impl IoHandler> for Host where Messa } } - fn stream_writable(&mut self, io: IoContext>, stream: StreamToken) { + fn stream_writable<'s>(&'s mut self, io: &mut IoContext<'s, NetworkIoMessage>, stream: StreamToken) { match stream { FIRST_CONNECTION ... LAST_CONNECTION => self.connection_writable(stream, io), _ => panic!("Received unknown writable token"), } } - fn timeout(&mut self, io: IoContext>, token: TimerToken) { + fn timeout<'s>(&'s mut self, io: &mut IoContext<'s, NetworkIoMessage>, token: TimerToken) { match token { IDLE => self.maintain_network(io), FIRST_CONNECTION ... LAST_CONNECTION => self.connection_timeout(token, io), NODETABLE_DISCOVERY => {}, NODETABLE_MAINTAIN => {}, _ => { - let protocol: ProtocolId = self.timers.get_mut(&token).expect("Unknown user timer token"); + let protocol = *self.timers.get_mut(&token).expect("Unknown user timer token"); match self.handlers.get_mut(protocol) { None => { warn!(target: "net", "No handler found for protocol: {:?}", protocol) }, Some(h) => { - h.timeout(&mut NetworkContext::new(io, Some(protocol), Some(token), &mut self.connections, &mut self.timers), token); + h.timeout(&mut NetworkContext::new(io, protocol, Some(token), &mut self.connections, &mut self.timers), token); } } } } } - fn message(&mut self, io: IoContext>, message: &mut NetworkIoMessage) { + fn message<'s>(&'s mut self, io: &mut IoContext<'s, NetworkIoMessage>, message: &'s mut NetworkIoMessage) { match message { &mut NetworkIoMessage::AddHandler { ref mut handler, ref protocol, ref versions } => { - self.handlers.insert(protocol, mem::replace(handler, None).unwrap()); + let mut h = mem::replace(handler, None).unwrap(); + h.initialize(&mut NetworkContext::new(io, protocol, None, &mut self.connections, &mut self.timers)); + self.handlers.insert(protocol, h); for v in versions { self.info.capabilities.push(CapabilityInfo { protocol: protocol, version: *v, packet_count:0 }); } @@ -620,11 +612,9 @@ impl IoHandler> for Host where Messa ref protocol, ref message } => { - let mut net_context = NetworkContext::new(io, None, None, &mut self.connections, &mut self.timers); for (p, h) in self.handlers.iter_mut() { if p != protocol { - net_context.set_protocol(p); - h.message(&mut net_context, &message); + h.message(&mut NetworkContext::new(io, p, None, &mut self.connections, &mut self.timers), &message); } } } diff --git a/src/network/mod.rs b/src/network/mod.rs index 1a1450cb6..7bc7ee71c 100644 --- a/src/network/mod.rs +++ b/src/network/mod.rs @@ -4,38 +4,42 @@ /// /// ```rust /// extern crate ethcore_util as util; -/// use util::network::*; +/// use util::*; /// /// struct MyHandler; /// -/// impl ProtocolHandler for MyHandler { -/// fn initialize(&mut self, io: &mut NetworkContext) { +/// struct MyMessage { +/// data: u32 +/// } +/// +/// impl NetworkProtocolHandler for MyHandler { +/// fn initialize(&mut self, io: &mut NetworkContext) { /// io.register_timer(1000); /// } /// -/// fn read(&mut self, io: &mut NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { +/// fn read(&mut self, io: &mut NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { /// println!("Received {} ({} bytes) from {}", packet_id, data.len(), peer); /// } /// -/// fn connected(&mut self, io: &mut NetworkContext, peer: &PeerId) { +/// fn connected(&mut self, io: &mut NetworkContext, peer: &PeerId) { /// println!("Connected {}", peer); /// } /// -/// fn disconnected(&mut self, io: &mut NetworkContext, peer: &PeerId) { +/// fn disconnected(&mut self, io: &mut NetworkContext, peer: &PeerId) { /// println!("Disconnected {}", peer); /// } /// -/// fn timeout(&mut self, io: &mut NetworkContext, timer: TimerToken) { +/// fn timeout(&mut self, io: &mut NetworkContext, timer: TimerToken) { /// println!("Timeout {}", timer); /// } /// -/// fn message(&mut self, io: &mut NetworkContext, message: &Message) { -/// println!("Message {}:{}", message.protocol, message.id); +/// fn message(&mut self, io: &mut NetworkContext, message: &MyMessage) { +/// println!("Message {}", message.data); /// } /// } /// /// fn main () { -/// let mut service = NetworkService::start().expect("Error creating network service"); +/// let mut service = NetworkService::::start().expect("Error creating network service"); /// service.register_protocol(Box::new(MyHandler), "myproto", &[1u8]); /// /// // Wait for quit condition @@ -43,7 +47,6 @@ /// // Drop the service /// } /// ``` -extern crate mio; mod host; mod connection; mod handshake; @@ -55,7 +58,7 @@ mod node; pub type PeerId = host::PeerId; pub type PacketId = host::PacketId; -pub type NetworkContext<'s, Message> = host::NetworkContext<'s, Message>; +pub type NetworkContext<'s,'io, Message> = host::NetworkContext<'s, 'io, Message>; pub type NetworkService = service::NetworkService; pub type NetworkIoMessage = host::NetworkIoMessage; pub type NetworkError = error::NetworkError; @@ -66,6 +69,8 @@ use io::*; /// All the handler function are called from within IO event loop. /// `Message` is the type for message data. pub trait NetworkProtocolHandler: Send where Message: Send { + /// Initialize the handler + fn initialize(&mut self, _io: &mut NetworkContext) {} /// Called when new network packet received. fn read(&mut self, io: &mut NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]); /// Called when new peer is connected. Only called when peer supports the same protocol. From a5bb7b7f921fab34f78c61882c7eb309c27bd2ba Mon Sep 17 00:00:00 2001 From: arkpar Date: Wed, 13 Jan 2016 15:08:36 +0100 Subject: [PATCH 04/13] Work around ICE --- src/io/mod.rs | 4 +--- src/io/service.rs | 2 +- src/network/host.rs | 15 ++++++++------- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/src/io/mod.rs b/src/io/mod.rs index 497e1ae78..86581b2c3 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -70,9 +70,7 @@ pub type TimerToken = service::TimerToken; pub type StreamToken = service::StreamToken; pub type IoContext<'s, M> = service::IoContext<'s, M>; pub type IoService = service::IoService; -pub const USER_TOKEN_START: usize = service::USER_TOKEN; - - +//pub const USER_TOKEN_START: usize = service::USER_TOKEN; // TODO: ICE in rustc 1.7.0-nightly (49c382779 2016-01-12) #[cfg(test)] mod tests { diff --git a/src/io/service.rs b/src/io/service.rs index aab8f5c2d..f3fe9c020 100644 --- a/src/io/service.rs +++ b/src/io/service.rs @@ -13,7 +13,7 @@ pub type StreamToken = usize; const MAX_USER_TIMERS: usize = 32; const USER_TIMER: usize = 0; const LAST_USER_TIMER: usize = USER_TIMER + MAX_USER_TIMERS - 1; -pub const USER_TOKEN: usize = LAST_USER_TIMER + 1; +//const USER_TOKEN: usize = LAST_USER_TIMER + 1; /// Messages used to communicate with the event loop from other threads. pub enum IoMessage where Message: Send + Sized { diff --git a/src/network/host.rs b/src/network/host.rs index 8917b1f8e..a807804c6 100644 --- a/src/network/host.rs +++ b/src/network/host.rs @@ -45,12 +45,13 @@ impl NetworkConfiguration { } // Tokens -const TOKEN_BEGIN: usize = USER_TOKEN_START; -const TCP_ACCEPT: usize = TOKEN_BEGIN; -const IDLE: usize = TOKEN_BEGIN + 1; -const NODETABLE_RECEIVE: usize = TOKEN_BEGIN + 2; -const NODETABLE_MAINTAIN: usize = TOKEN_BEGIN + 3; -const NODETABLE_DISCOVERY: usize = TOKEN_BEGIN + 4; +//const TOKEN_BEGIN: usize = USER_TOKEN_START; // TODO: ICE in rustc 1.7.0-nightly (49c382779 2016-01-12) +const TOKEN_BEGIN: usize = 32; +const TCP_ACCEPT: usize = TOKEN_BEGIN + 1; +const IDLE: usize = TOKEN_BEGIN + 2; +const NODETABLE_RECEIVE: usize = TOKEN_BEGIN + 3; +const NODETABLE_MAINTAIN: usize = TOKEN_BEGIN + 4; +const NODETABLE_DISCOVERY: usize = TOKEN_BEGIN + 5; const FIRST_CONNECTION: usize = TOKEN_BEGIN + 16; const LAST_CONNECTION: usize = FIRST_CONNECTION + MAX_CONNECTIONS - 1; @@ -545,9 +546,9 @@ impl IoHandler> for Host where Messa fn stream_readable<'s>(&'s mut self, io: &mut IoContext<'s, NetworkIoMessage>, stream: StreamToken) { match stream { - TCP_ACCEPT => self.accept(io), FIRST_CONNECTION ... LAST_CONNECTION => self.connection_readable(stream, io), NODETABLE_RECEIVE => {}, + TCP_ACCEPT => self.accept(io), _ => panic!("Received unknown readable token"), } } From 4d2437906effd9f6ebe3e84cad3a29d216f29931 Mon Sep 17 00:00:00 2001 From: arkpar Date: Wed, 13 Jan 2016 23:13:57 +0100 Subject: [PATCH 05/13] Io channel --- src/io/mod.rs | 1 + src/io/service.rs | 18 ++++++++++++++++++ src/network/host.rs | 14 +++----------- src/network/mod.rs | 1 + src/network/service.rs | 5 +++++ 5 files changed, 28 insertions(+), 11 deletions(-) diff --git a/src/io/mod.rs b/src/io/mod.rs index 86581b2c3..23a8509cc 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -70,6 +70,7 @@ pub type TimerToken = service::TimerToken; pub type StreamToken = service::StreamToken; pub type IoContext<'s, M> = service::IoContext<'s, M>; pub type IoService = service::IoService; +pub type IoChannel = service::IoChannel; //pub const USER_TOKEN_START: usize = service::USER_TOKEN; // TODO: ICE in rustc 1.7.0-nightly (49c382779 2016-01-12) #[cfg(test)] diff --git a/src/io/service.rs b/src/io/service.rs index f3fe9c020..b8f300670 100644 --- a/src/io/service.rs +++ b/src/io/service.rs @@ -146,6 +146,19 @@ impl Handler for IoManager where Message: Send + 'static { } } +/// Allows sending messages into the event loop. All the IO handlers will get the message +/// in the `message` callback. +pub struct IoChannel where Message: Send { + channel: Sender> +} + +impl IoChannel where Message: Send { + pub fn send(&mut self, message: Message) -> Result<(), IoError> { + try!(self.channel.send(IoMessage::UserMessage(message))); + Ok(()) + } +} + /// General IO Service. Starts an event loop and dispatches IO requests. /// 'Message' is a notification message type pub struct IoService where Message: Send + 'static { @@ -180,6 +193,11 @@ impl IoService where Message: Send + 'static { try!(self.host_channel.send(IoMessage::UserMessage(message))); Ok(()) } + + /// Create a new message channel + pub fn channel(&mut self) -> IoChannel { + IoChannel { channel: self.host_channel.clone() } + } } impl Drop for IoService where Message: Send { diff --git a/src/network/host.rs b/src/network/host.rs index a807804c6..4362f0d53 100644 --- a/src/network/host.rs +++ b/src/network/host.rs @@ -76,10 +76,7 @@ pub enum NetworkIoMessage where Message: Send { data: Vec, }, /// User message - User { - protocol: ProtocolId, - message: Message, - }, + User(Message), } /// Local (temporary) peer session ID. @@ -609,14 +606,9 @@ impl IoHandler> for Host where Messa } } }, - &mut NetworkIoMessage::User { - ref protocol, - ref message - } => { + &mut NetworkIoMessage::User(ref message) => { for (p, h) in self.handlers.iter_mut() { - if p != protocol { - h.message(&mut NetworkContext::new(io, p, None, &mut self.connections, &mut self.timers), &message); - } + h.message(&mut NetworkContext::new(io, p, None, &mut self.connections, &mut self.timers), &message); } } } diff --git a/src/network/mod.rs b/src/network/mod.rs index 7bc7ee71c..a47e88927 100644 --- a/src/network/mod.rs +++ b/src/network/mod.rs @@ -61,6 +61,7 @@ pub type PacketId = host::PacketId; pub type NetworkContext<'s,'io, Message> = host::NetworkContext<'s, 'io, Message>; pub type NetworkService = service::NetworkService; pub type NetworkIoMessage = host::NetworkIoMessage; +pub use network::host::NetworkIoMessage::User as UserMessage; pub type NetworkError = error::NetworkError; use io::*; diff --git a/src/network/service.rs b/src/network/service.rs index be70e2ee1..401c74634 100644 --- a/src/network/service.rs +++ b/src/network/service.rs @@ -40,5 +40,10 @@ impl NetworkService where Message: Send + 'static { })); Ok(()) } + + pub fn io(&mut self) -> &mut IoService> { + &mut self.io_service + } + } From a2f13e1efb7fe30401c8aa4cfbcf1d759ce48998 Mon Sep 17 00:00:00 2001 From: arkpar Date: Thu, 14 Jan 2016 16:52:10 +0100 Subject: [PATCH 06/13] Minor fixes --- src/io/service.rs | 1 + src/network/host.rs | 35 ++++++++++++++++++++++++++++++++++- 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/src/io/service.rs b/src/io/service.rs index b8f300670..1512b958e 100644 --- a/src/io/service.rs +++ b/src/io/service.rs @@ -136,6 +136,7 @@ impl Handler for IoManager where Message: Send + 'static { handler, } => { self.handlers.push(handler); + self.handlers.last_mut().unwrap().initialize(&mut IoContext::new(event_loop, &mut self.timers)); }, IoMessage::UserMessage(ref mut data) => { for h in self.handlers.iter_mut() { diff --git a/src/network/host.rs b/src/network/host.rs index 4362f0d53..7a43b6959 100644 --- a/src/network/host.rs +++ b/src/network/host.rs @@ -23,6 +23,8 @@ const _DEFAULT_PORT: u16 = 30304; const MAX_CONNECTIONS: usize = 1024; const IDEAL_PEERS: u32 = 10; +const MAINTENANCE_TIMEOUT: u64 = 1000; + #[derive(Debug)] struct NetworkConfiguration { listen_address: SocketAddr, @@ -164,6 +166,18 @@ impl<'s, 'io, Message> NetworkContext<'s, 'io, Message> where Message: Send + 's e @ Err(_) => e, } } + + /// Returns peer identification string + pub fn peer_info(&self, peer: PeerId) -> String { + match self.connections.get(Token(peer)) { + Some(&ConnectionEntry::Session(ref s)) => { + s.info.client_version.clone() + }, + _ => { + "unknown".to_string() + } + } + } } /// Shared host information @@ -255,6 +269,7 @@ impl Host where Message: Send { fn maintain_network(&mut self, io: &mut IoContext>) { self.connect_peers(io); + io.event_loop.timeout_ms(Token(IDLE), MAINTENANCE_TIMEOUT).unwrap(); } fn have_session(&self, id: &NodeId) -> bool { @@ -340,6 +355,7 @@ impl Host where Message: Send { let nonce = self.info.next_nonce(); match self.connections.insert_with(|token| ConnectionEntry::Handshake(Handshake::new(token, id, socket, &nonce).expect("Can't create handshake"))) { Some(token) => { + warn!(target: "slab", "inserted {}", token.as_usize()); match self.connections.get_mut(token) { Some(&mut ConnectionEntry::Handshake(ref mut h)) => { h.start(&self.info, true) @@ -469,6 +485,21 @@ impl Host where Message: Send { fn start_session(&mut self, token: StreamToken, io: &mut IoContext>) { let info = &self.info; + // TODO: use slab::replace_with (currently broken) + match self.connections.remove(Token(token)) { + Some(ConnectionEntry::Handshake(h)) => { + match Session::new(h, io.event_loop, info) { + Ok(session) => { + assert!(Token(token) == self.connections.insert(ConnectionEntry::Session(session)).ok().unwrap()); + }, + Err(e) => { + debug!(target: "net", "Session construction error: {:?}", e); + } + } + }, + _ => panic!("Error updating slab with session") + } + /* self.connections.replace_with(Token(token), |c| { match c { ConnectionEntry::Handshake(h) => Session::new(h, io.event_loop, info) @@ -480,6 +511,7 @@ impl Host where Message: Send { _ => { panic!("No handshake to create a session from"); } } }).expect("Error updating slab with session"); + */ } fn connection_timeout<'s>(&'s mut self, token: StreamToken, io: &mut IoContext<'s, NetworkIoMessage>) { @@ -504,6 +536,7 @@ impl Host where Message: Send { h.disconnected(&mut NetworkContext::new(io, p, Some(token), &mut self.connections, &mut self.timers), &token); } self.connections.remove(Token(token)); + warn!(target: "slab", "removed {}", token); } } @@ -518,7 +551,7 @@ impl IoHandler> for Host where Messa // Start listening for incoming connections io.event_loop.register(&self.listener, Token(TCP_ACCEPT), EventSet::readable(), PollOpt::edge()).unwrap(); - io.event_loop.timeout_ms(Token(IDLE), 1000).unwrap(); //TODO: check delay + io.event_loop.timeout_ms(Token(IDLE), MAINTENANCE_TIMEOUT).unwrap(); // open the udp socket io.event_loop.register(&self.udp_socket, Token(NODETABLE_RECEIVE), EventSet::readable(), PollOpt::edge()).unwrap(); io.event_loop.timeout_ms(Token(NODETABLE_MAINTAIN), 7200).unwrap(); From db25f7e590984900484c4ab8f2a79f6818e48ec8 Mon Sep 17 00:00:00 2001 From: arkpar Date: Thu, 14 Jan 2016 16:56:59 +0100 Subject: [PATCH 07/13] Minor fixes --- src/hash.rs | 10 ++++++++++ src/network/host.rs | 2 -- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/hash.rs b/src/hash.rs index f67833072..e3c05a79a 100644 --- a/src/hash.rs +++ b/src/hash.rs @@ -444,6 +444,16 @@ impl From for Address { } } } + +impl From for H64 { + fn from(value: H256) -> H64 { + unsafe { + let mut ret: H64 = ::std::mem::uninitialized(); + ::std::ptr::copy(value.as_ptr().offset(20), ret.as_mut_ptr(), 8); + ret + } + } +} /* impl<'_> From<&'_ H256> for Address { fn from(value: &'_ H256) -> Address { diff --git a/src/network/host.rs b/src/network/host.rs index 7a43b6959..b562ca765 100644 --- a/src/network/host.rs +++ b/src/network/host.rs @@ -355,7 +355,6 @@ impl Host where Message: Send { let nonce = self.info.next_nonce(); match self.connections.insert_with(|token| ConnectionEntry::Handshake(Handshake::new(token, id, socket, &nonce).expect("Can't create handshake"))) { Some(token) => { - warn!(target: "slab", "inserted {}", token.as_usize()); match self.connections.get_mut(token) { Some(&mut ConnectionEntry::Handshake(ref mut h)) => { h.start(&self.info, true) @@ -536,7 +535,6 @@ impl Host where Message: Send { h.disconnected(&mut NetworkContext::new(io, p, Some(token), &mut self.connections, &mut self.timers), &token); } self.connections.remove(Token(token)); - warn!(target: "slab", "removed {}", token); } } From df2e9854c7524cb72cc396ab74fae81bc65de836 Mon Sep 17 00:00:00 2001 From: arkpar Date: Thu, 14 Jan 2016 19:04:13 +0100 Subject: [PATCH 08/13] Host info --- src/network/host.rs | 2 +- src/network/service.rs | 15 +++++++++++++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/src/network/host.rs b/src/network/host.rs index b562ca765..abe1cbfcc 100644 --- a/src/network/host.rs +++ b/src/network/host.rs @@ -223,7 +223,7 @@ enum ConnectionEntry { /// Root IO handler. Manages protocol handlers, IO timers and network connections. pub struct Host where Message: Send { - info: HostInfo, + pub info: HostInfo, udp_socket: UdpSocket, listener: TcpListener, connections: Slab, diff --git a/src/network/service.rs b/src/network/service.rs index 401c74634..48cc11152 100644 --- a/src/network/service.rs +++ b/src/network/service.rs @@ -8,15 +8,19 @@ use io::*; /// `Message` defines a notification data type. pub struct NetworkService where Message: Send + 'static { io_service: IoService>, + host_info: String, } impl NetworkService where Message: Send + 'static { /// Starts IO event loop pub fn start() -> Result, UtilError> { let mut io_service = try!(IoService::>::start()); - try!(io_service.register_handler(Box::new(Host::new()))); + let host = Box::new(Host::new()); + let host_info = host.info.client_version.clone(); + try!(io_service.register_handler(host)); Ok(NetworkService { - io_service: io_service + io_service: io_service, + host_info: host_info, }) } @@ -41,9 +45,16 @@ impl NetworkService where Message: Send + 'static { Ok(()) } + /// Returns host identifier string as advertised to other peers + pub fn host_info(&self) -> String { + self.host_info.clone() + } + + /// Returns underlying io service. pub fn io(&mut self) -> &mut IoService> { &mut self.io_service } + } From 2d36062794e1bc3d92959c12ce8e308ad1d3be32 Mon Sep 17 00:00:00 2001 From: arkpar Date: Fri, 15 Jan 2016 00:50:48 +0100 Subject: [PATCH 09/13] Slab bug workaround --- Cargo.toml | 1 + src/lib.rs | 1 + src/network/host.rs | 47 +++++++++++++++++++++++++----------------- src/network/session.rs | 1 + 4 files changed, 31 insertions(+), 19 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6e32edcab..b7dca6e1b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ rust-crypto = "0.2.34" elastic-array = "0.4" heapsize = "0.2" itertools = "0.4" +slab = { git = "https://github.com/arkpar/slab.git" } [dev-dependencies] json-tests = { path = "json-tests" } diff --git a/src/lib.rs b/src/lib.rs index 4b0108c6f..fca71a100 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -29,6 +29,7 @@ //! sudo make install //! ``` +extern crate slab; extern crate rustc_serialize; extern crate mio; extern crate rand; diff --git a/src/network/host.rs b/src/network/host.rs index abe1cbfcc..6b3dfbfd1 100644 --- a/src/network/host.rs +++ b/src/network/host.rs @@ -4,7 +4,6 @@ use std::collections::{HashMap}; use std::hash::{Hasher}; use std::str::{FromStr}; use mio::*; -use mio::util::{Slab}; use mio::tcp::*; use mio::udp::*; use hash::*; @@ -17,6 +16,9 @@ use error::*; use io::*; use network::NetworkProtocolHandler; use network::node::*; +use slab::Index; + +type Slab = ::slab::Slab; const _DEFAULT_PORT: u16 = 30304; @@ -128,7 +130,7 @@ impl<'s, 'io, Message> NetworkContext<'s, 'io, Message> where Message: Send + 's /// Send a packet over the network to another peer. pub fn send(&mut self, peer: PeerId, packet_id: PacketId, data: Vec) -> Result<(), UtilError> { - match self.connections.get_mut(Token(peer)) { + match self.connections.get_mut(peer) { Some(&mut ConnectionEntry::Session(ref mut s)) => { s.send_packet(self.protocol, packet_id as u8, &data).unwrap_or_else(|e| { warn!(target: "net", "Send error: {:?}", e); @@ -169,7 +171,7 @@ impl<'s, 'io, Message> NetworkContext<'s, 'io, Message> where Message: Send + 's /// Returns peer identification string pub fn peer_info(&self, peer: PeerId) -> String { - match self.connections.get(Token(peer)) { + match self.connections.get(peer) { Some(&ConnectionEntry::Session(ref s)) => { s.info.client_version.clone() }, @@ -251,7 +253,7 @@ impl Host where Message: Send { }, udp_socket: udp_socket, listener: listener, - connections: Slab::new_starting_at(Token(FIRST_CONNECTION), MAX_CONNECTIONS), + connections: Slab::new_starting_at(FIRST_CONNECTION, MAX_CONNECTIONS), timers: HashMap::new(), nodes: HashMap::new(), handlers: HashMap::new(), @@ -353,8 +355,9 @@ impl Host where Message: Send { }; let nonce = self.info.next_nonce(); - match self.connections.insert_with(|token| ConnectionEntry::Handshake(Handshake::new(token, id, socket, &nonce).expect("Can't create handshake"))) { + match self.connections.insert_with(|token| ConnectionEntry::Handshake(Handshake::new(Token(token), id, socket, &nonce).expect("Can't create handshake"))) { Some(token) => { + warn!(target: "slab", "inserted {}", token.as_usize()); match self.connections.get_mut(token) { Some(&mut ConnectionEntry::Handshake(ref mut h)) => { h.start(&self.info, true) @@ -378,7 +381,7 @@ impl Host where Message: Send { fn connection_writable<'s>(&'s mut self, token: StreamToken, io: &mut IoContext<'s, NetworkIoMessage>) { let mut kill = false; let mut create_session = false; - match self.connections.get_mut(Token(token)) { + match self.connections.get_mut(token) { Some(&mut ConnectionEntry::Handshake(ref mut h)) => { h.writable(io.event_loop, &self.info).unwrap_or_else(|e| { debug!(target: "net", "Handshake write error: {:?}", e); @@ -402,7 +405,7 @@ impl Host where Message: Send { } else if create_session { self.start_session(token, io); } - match self.connections.get_mut(Token(token)) { + match self.connections.get_mut(token) { Some(&mut ConnectionEntry::Session(ref mut s)) => { s.reregister(io.event_loop).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e)); }, @@ -419,7 +422,7 @@ impl Host where Message: Send { let mut create_session = false; let mut ready_data: Vec = Vec::new(); let mut packet_data: Option<(ProtocolId, PacketId, Vec)> = None; - match self.connections.get_mut(Token(token)) { + match self.connections.get_mut(token) { Some(&mut ConnectionEntry::Handshake(ref mut h)) => { h.readable(io.event_loop, &self.info).unwrap_or_else(|e| { debug!(target: "net", "Handshake read error: {:?}", e); @@ -474,7 +477,7 @@ impl Host where Message: Send { h.read(&mut NetworkContext::new(io, p, Some(token), &mut self.connections, &mut self.timers), &token, packet_id, &data[1..]); } - match self.connections.get_mut(Token(token)) { + match self.connections.get_mut(token) { Some(&mut ConnectionEntry::Session(ref mut s)) => { s.reregister(io.event_loop).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e)); }, @@ -485,11 +488,12 @@ impl Host where Message: Send { fn start_session(&mut self, token: StreamToken, io: &mut IoContext>) { let info = &self.info; // TODO: use slab::replace_with (currently broken) - match self.connections.remove(Token(token)) { + /* + match self.connections.remove(token) { Some(ConnectionEntry::Handshake(h)) => { match Session::new(h, io.event_loop, info) { Ok(session) => { - assert!(Token(token) == self.connections.insert(ConnectionEntry::Session(session)).ok().unwrap()); + assert!(token == self.connections.insert(ConnectionEntry::Session(session)).ok().unwrap()); }, Err(e) => { debug!(target: "net", "Session construction error: {:?}", e); @@ -497,9 +501,9 @@ impl Host where Message: Send { } }, _ => panic!("Error updating slab with session") - } - /* - self.connections.replace_with(Token(token), |c| { + }*/ + warn!(target: "slab", "replaced {}", token.as_usize()); + self.connections.replace_with(token, |c| { match c { ConnectionEntry::Handshake(h) => Session::new(h, io.event_loop, info) .map(|s| Some(ConnectionEntry::Session(s))) @@ -510,7 +514,6 @@ impl Host where Message: Send { _ => { panic!("No handshake to create a session from"); } } }).expect("Error updating slab with session"); - */ } fn connection_timeout<'s>(&'s mut self, token: StreamToken, io: &mut IoContext<'s, NetworkIoMessage>) { @@ -519,7 +522,8 @@ impl Host where Message: Send { fn kill_connection<'s>(&'s mut self, token: StreamToken, io: &mut IoContext<'s, NetworkIoMessage>) { let mut to_disconnect: Vec = Vec::new(); - match self.connections.get_mut(Token(token)) { + let mut remove = true; + match self.connections.get_mut(token) { Some(&mut ConnectionEntry::Handshake(_)) => (), // just abandon handshake Some(&mut ConnectionEntry::Session(ref mut s)) if s.is_ready() => { for (p, _) in self.handlers.iter_mut() { @@ -528,13 +532,18 @@ impl Host where Message: Send { } } }, - _ => (), + _ => { + remove = false; + }, } for p in to_disconnect { let mut h = self.handlers.get_mut(p).unwrap(); h.disconnected(&mut NetworkContext::new(io, p, Some(token), &mut self.connections, &mut self.timers), &token); } - self.connections.remove(Token(token)); + if remove { + self.connections.remove(token); + warn!(target: "slab", "removed {}", token); + } } } @@ -626,7 +635,7 @@ impl IoHandler> for Host where Messa ref protocol, ref data, } => { - match self.connections.get_mut(Token(*peer as usize)) { + match self.connections.get_mut(*peer as usize) { Some(&mut ConnectionEntry::Session(ref mut s)) => { s.send_packet(protocol, *packet_id as u8, &data).unwrap_or_else(|e| { warn!(target: "net", "Send error: {:?}", e); diff --git a/src/network/session.rs b/src/network/session.rs index 2d8ba2245..828e4b062 100644 --- a/src/network/session.rs +++ b/src/network/session.rs @@ -242,6 +242,7 @@ impl Session { i += 1; } trace!(target: "net", "Hello: {} v{} {} {:?}", client_version, protocol, id, caps); + self.info.client_version = client_version; self.info.capabilities = caps; if protocol != host.protocol_version { return Err(From::from(self.disconnect(DisconnectReason::UselessPeer))); From 825f3733fd5efdb0c3ab1a1699827e1b0488cade Mon Sep 17 00:00:00 2001 From: arkpar Date: Fri, 15 Jan 2016 00:52:21 +0100 Subject: [PATCH 10/13] Removed debug output --- src/network/host.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/network/host.rs b/src/network/host.rs index 6b3dfbfd1..9c6b8a557 100644 --- a/src/network/host.rs +++ b/src/network/host.rs @@ -357,7 +357,6 @@ impl Host where Message: Send { let nonce = self.info.next_nonce(); match self.connections.insert_with(|token| ConnectionEntry::Handshake(Handshake::new(Token(token), id, socket, &nonce).expect("Can't create handshake"))) { Some(token) => { - warn!(target: "slab", "inserted {}", token.as_usize()); match self.connections.get_mut(token) { Some(&mut ConnectionEntry::Handshake(ref mut h)) => { h.start(&self.info, true) @@ -502,7 +501,6 @@ impl Host where Message: Send { }, _ => panic!("Error updating slab with session") }*/ - warn!(target: "slab", "replaced {}", token.as_usize()); self.connections.replace_with(token, |c| { match c { ConnectionEntry::Handshake(h) => Session::new(h, io.event_loop, info) @@ -542,7 +540,6 @@ impl Host where Message: Send { } if remove { self.connections.remove(token); - warn!(target: "slab", "removed {}", token); } } } From 8dd4b3689afe74a47db3d7eb07edc2057ecd56bb Mon Sep 17 00:00:00 2001 From: arkpar Date: Fri, 15 Jan 2016 00:54:43 +0100 Subject: [PATCH 11/13] Removed unused import --- src/network/host.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/network/host.rs b/src/network/host.rs index 9c6b8a557..bcee38aad 100644 --- a/src/network/host.rs +++ b/src/network/host.rs @@ -16,7 +16,6 @@ use error::*; use io::*; use network::NetworkProtocolHandler; use network::node::*; -use slab::Index; type Slab = ::slab::Slab; From 2319fd4cecb921d93f0f54a3d184135e263b318c Mon Sep 17 00:00:00 2001 From: arkpar Date: Fri, 15 Jan 2016 16:19:21 +0100 Subject: [PATCH 12/13] New list of bootnodes --- src/io/service.rs | 3 ++- src/network/host.rs | 15 ++++++++------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/io/service.rs b/src/io/service.rs index 1512b958e..4ccfb2407 100644 --- a/src/io/service.rs +++ b/src/io/service.rs @@ -30,6 +30,7 @@ pub enum IoMessage where Message: Send + Sized { /// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem. pub struct IoContext<'s, Message> where Message: Send + 'static { timers: &'s mut Slab, + /// Low leve MIO Event loop for custom handler registration. pub event_loop: &'s mut EventLoop>, } @@ -43,7 +44,7 @@ impl<'s, Message> IoContext<'s, Message> where Message: Send + 'static { } /// Register a new IO timer. Returns a new timer token. 'IoHandler::timeout' will be called with the token. - pub fn register_timer(&mut self, ms: u64) -> Result{ + pub fn register_timer(&mut self, ms: u64) -> Result { match self.timers.insert(UserTimer { delay: ms, }) { diff --git a/src/network/host.rs b/src/network/host.rs index bcee38aad..8b1038c8a 100644 --- a/src/network/host.rs +++ b/src/network/host.rs @@ -164,7 +164,7 @@ impl<'s, 'io, Message> NetworkContext<'s, 'io, Message> where Message: Send + 's self.timers.insert(token, self.protocol); Ok(token) }, - e @ Err(_) => e, + e => e, } } @@ -561,12 +561,13 @@ impl IoHandler> for Host where Messa let port = self.info.config.listen_address.port(); self.info.listen_port = port; - self.add_node("enode://c022e7a27affdd1632f2e67dffeb87f02bf506344bb142e08d12b28e7e5c6e5dbb8183a46a77bff3631b51c12e8cf15199f797feafdc8834aaf078ad1a2bcfa0@127.0.0.1:30303"); - self.add_node("enode://5374c1bff8df923d3706357eeb4983cd29a63be40a269aaa2296ee5f3b2119a8978c0ed68b8f6fc84aad0df18790417daadf91a4bfbb786a16c9b0a199fa254a@gav.ethdev.com:30300"); - self.add_node("enode://e58d5e26b3b630496ec640f2530f3e7fa8a8c7dfe79d9e9c4aac80e3730132b869c852d3125204ab35bb1b1951f6f2d40996c1034fd8c5a69b383ee337f02ddc@gav.ethdev.com:30303"); - self.add_node("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@52.16.188.185:30303"); - self.add_node("enode://7f25d3eab333a6b98a8b5ed68d962bb22c876ffcd5561fca54e3c2ef27f754df6f7fd7c9b74cc919067abac154fb8e1f8385505954f161ae440abc355855e034@54.207.93.166:30303"); - self.add_node("enode://5374c1bff8df923d3706357eeb4983cd29a63be40a269aaa2296ee5f3b2119a8978c0ed68b8f6fc84aad0df18790417daadf91a4bfbb786a16c9b0a199fa254a@92.51.165.126:30303"); + //self.add_node("enode://c022e7a27affdd1632f2e67dffeb87f02bf506344bb142e08d12b28e7e5c6e5dbb8183a46a77bff3631b51c12e8cf15199f797feafdc8834aaf078ad1a2bcfa0@127.0.0.1:30303"); + // GO bootnodes + self.add_node("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@52.16.188.185:30303"); // IE + self.add_node("enode://de471bccee3d042261d52e9bff31458daecc406142b401d4cd848f677479f73104b9fdeb090af9583d3391b7f10cb2ba9e26865dd5fca4fcdc0fb1e3b723c786@54.94.239.50:30303"); // BR + self.add_node("enode://1118980bf48b0a3640bdba04e0fe78b1add18e1cd99bf22d53daac1fd9972ad650df52176e7c7d89d1114cfef2bc23a2959aa54998a46afcf7d91809f0855082@52.74.57.123:30303"); // SG + // ETH/DEV cpp-ethereum (poc-9.ethdev.com) + self.add_node("enode://979b7fa28feeb35a4741660a16076f1943202cb72b6af70d327f053e248bab9ba81760f39d0701ef1d8f89cc1fbd2cacba0710a12cd5314d5e0c9021aa3637f9@5.1.83.226:30303"); } fn stream_hup<'s>(&'s mut self, io: &mut IoContext<'s, NetworkIoMessage>, stream: StreamToken) { From 3ec294bda2eefe03a4721415773f971111baebf2 Mon Sep 17 00:00:00 2001 From: arkpar Date: Fri, 15 Jan 2016 16:36:08 +0100 Subject: [PATCH 13/13] Missing files --- src/network/error.rs | 41 ++++++++++++++++++++++ src/network/node.rs | 83 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 124 insertions(+) create mode 100644 src/network/error.rs create mode 100644 src/network/node.rs diff --git a/src/network/error.rs b/src/network/error.rs new file mode 100644 index 000000000..d255cb043 --- /dev/null +++ b/src/network/error.rs @@ -0,0 +1,41 @@ +use io::IoError; +use rlp::*; + +#[derive(Debug, Copy, Clone)] +pub enum DisconnectReason +{ + DisconnectRequested, + //TCPError, + //BadProtocol, + UselessPeer, + //TooManyPeers, + //DuplicatePeer, + //IncompatibleProtocol, + //NullIdentity, + //ClientQuit, + //UnexpectedIdentity, + //LocalIdentity, + //PingTimeout, +} + +#[derive(Debug)] +pub enum NetworkError { + Auth, + BadProtocol, + PeerNotFound, + Disconnect(DisconnectReason), + Io(IoError), +} + +impl From for NetworkError { + fn from(_err: DecoderError) -> NetworkError { + NetworkError::Auth + } +} + +impl From for NetworkError { + fn from(err: IoError) -> NetworkError { + NetworkError::Io(err) + } +} + diff --git a/src/network/node.rs b/src/network/node.rs new file mode 100644 index 000000000..5c08e0a66 --- /dev/null +++ b/src/network/node.rs @@ -0,0 +1,83 @@ +use std::net::{SocketAddr, ToSocketAddrs}; +use std::hash::{Hash, Hasher}; +use std::str::{FromStr}; +use hash::*; +use rlp::*; +use time::Tm; +use error::*; + +/// Node public key +pub type NodeId = H512; + +#[derive(Debug)] +/// Noe address info +pub struct NodeEndpoint { + /// IP(V4 or V6) address + pub address: SocketAddr, + /// Address as string (can be host name). + pub address_str: String, + /// Conneciton port. + pub udp_port: u16 +} + +impl NodeEndpoint { + /// Create endpoint from string. Performs name resolution if given a host name. + fn from_str(s: &str) -> Result { + let address = s.to_socket_addrs().map(|mut i| i.next()); + match address { + Ok(Some(a)) => Ok(NodeEndpoint { + address: a, + address_str: s.to_string(), + udp_port: a.port() + }), + Ok(_) => Err(UtilError::AddressResolve(None)), + Err(e) => Err(UtilError::AddressResolve(Some(e))) + } + } +} + +#[derive(PartialEq, Eq, Copy, Clone)] +pub enum PeerType { + Required, + Optional +} + +pub struct Node { + pub id: NodeId, + pub endpoint: NodeEndpoint, + pub peer_type: PeerType, + pub last_attempted: Option, +} + +impl FromStr for Node { + type Err = UtilError; + fn from_str(s: &str) -> Result { + let (id, endpoint) = if &s[0..8] == "enode://" && s.len() > 136 && &s[136..137] == "@" { + (try!(NodeId::from_str(&s[8..136])), try!(NodeEndpoint::from_str(&s[137..]))) + } + else { + (NodeId::new(), try!(NodeEndpoint::from_str(s))) + }; + + Ok(Node { + id: id, + endpoint: endpoint, + peer_type: PeerType::Optional, + last_attempted: None, + }) + } +} + +impl PartialEq for Node { + fn eq(&self, other: &Self) -> bool { + self.id == other.id + } +} +impl Eq for Node { } + +impl Hash for Node { + fn hash(&self, state: &mut H) where H: Hasher { + self.id.hash(state) + } +} +