diff --git a/Cargo.toml b/Cargo.toml index f92dcca53..4ed14551f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ rust-crypto = "0.2.34" elastic-array = "0.4" heapsize = "0.2" itertools = "0.4" +slab = { git = "https://github.com/arkpar/slab.git" } [dev-dependencies] json-tests = { path = "json-tests" } diff --git a/src/error.rs b/src/error.rs index 17b65ceec..04f7b96ce 100644 --- a/src/error.rs +++ b/src/error.rs @@ -3,6 +3,7 @@ use rustc_serialize::hex::FromHexError; use network::NetworkError; use rlp::DecoderError; +use io; #[derive(Debug)] pub enum BaseDataError { @@ -13,7 +14,8 @@ pub enum BaseDataError { /// General error type which should be capable of representing all errors in ethcore. pub enum UtilError { Crypto(::crypto::CryptoError), - Io(::std::io::Error), + StdIo(::std::io::Error), + Io(io::IoError), AddressParse(::std::net::AddrParseError), AddressResolve(Option<::std::io::Error>), FromHex(FromHexError), @@ -43,6 +45,12 @@ impl From for UtilError { impl From<::std::io::Error> for UtilError { fn from(err: ::std::io::Error) -> UtilError { + UtilError::StdIo(err) + } +} + +impl From for UtilError { + fn from(err: io::IoError) -> UtilError { UtilError::Io(err) } } diff --git a/src/hash.rs b/src/hash.rs index 8c1772b2c..3c90b841d 100644 --- a/src/hash.rs +++ b/src/hash.rs @@ -458,6 +458,16 @@ impl From for Address { } } } + +impl From for H64 { + fn from(value: H256) -> H64 { + unsafe { + let mut ret: H64 = ::std::mem::uninitialized(); + ::std::ptr::copy(value.as_ptr().offset(20), ret.as_mut_ptr(), 8); + ret + } + } +} /* impl<'_> From<&'_ H256> for Address { fn from(value: &'_ H256) -> Address { diff --git a/src/io/mod.rs b/src/io/mod.rs new file mode 100644 index 000000000..23a8509cc --- /dev/null +++ b/src/io/mod.rs @@ -0,0 +1,107 @@ +/// General IO module. +/// +/// Example usage for craeting a network service and adding an IO handler: +/// +/// ```rust +/// extern crate ethcore_util; +/// use ethcore_util::*; +/// +/// struct MyHandler; +/// +/// struct MyMessage { +/// data: u32 +/// } +/// +/// impl IoHandler for MyHandler { +/// fn initialize(&mut self, io: &mut IoContext) { +/// io.register_timer(1000).unwrap(); +/// } +/// +/// fn timeout(&mut self, _io: &mut IoContext, timer: TimerToken) { +/// println!("Timeout {}", timer); +/// } +/// +/// fn message(&mut self, _io: &mut IoContext, message: &mut MyMessage) { +/// println!("Message {}", message.data); +/// } +/// } +/// +/// fn main () { +/// let mut service = IoService::::start().expect("Error creating network service"); +/// service.register_handler(Box::new(MyHandler)).unwrap(); +/// +/// // Wait for quit condition +/// // ... +/// // Drop the service +/// } +/// ``` +mod service; + +#[derive(Debug)] +pub enum IoError { + Mio(::std::io::Error), +} + +impl From<::mio::NotifyError>> for IoError where Message: Send { + fn from(_err: ::mio::NotifyError>) -> IoError { + IoError::Mio(::std::io::Error::new(::std::io::ErrorKind::ConnectionAborted, "Network IO notification error")) + } +} + +/// Generic IO handler. +/// All the handler function are called from within IO event loop. +/// `Message` type is used as notification data +pub trait IoHandler: Send where Message: Send + 'static { + /// Initialize the handler + fn initialize<'s>(&'s mut self, _io: &mut IoContext<'s, Message>) {} + /// Timer function called after a timeout created with `HandlerIo::timeout`. + fn timeout<'s>(&'s mut self, _io: &mut IoContext<'s, Message>, _timer: TimerToken) {} + /// Called when a broadcasted message is received. The message can only be sent from a different IO handler. + fn message<'s>(&'s mut self, _io: &mut IoContext<'s, Message>, _message: &'s mut Message) {} // TODO: make message immutable and provide internal channel for adding network handler + /// Called when an IO stream gets closed + fn stream_hup<'s>(&'s mut self, _io: &mut IoContext<'s, Message>, _stream: StreamToken) {} + /// Called when an IO stream can be read from + fn stream_readable<'s>(&'s mut self, _io: &mut IoContext<'s, Message>, _stream: StreamToken) {} + /// Called when an IO stream can be written to + fn stream_writable<'s>(&'s mut self, _io: &mut IoContext<'s, Message>, _stream: StreamToken) {} +} + +pub type TimerToken = service::TimerToken; +pub type StreamToken = service::StreamToken; +pub type IoContext<'s, M> = service::IoContext<'s, M>; +pub type IoService = service::IoService; +pub type IoChannel = service::IoChannel; +//pub const USER_TOKEN_START: usize = service::USER_TOKEN; // TODO: ICE in rustc 1.7.0-nightly (49c382779 2016-01-12) + +#[cfg(test)] +mod tests { + + use io::*; + + struct MyHandler; + + struct MyMessage { + data: u32 + } + + impl IoHandler for MyHandler { + fn initialize(&mut self, io: &mut IoContext) { + io.register_timer(1000).unwrap(); + } + + fn timeout(&mut self, _io: &mut IoContext, timer: TimerToken) { + println!("Timeout {}", timer); + } + + fn message(&mut self, _io: &mut IoContext, message: &mut MyMessage) { + println!("Message {}", message.data); + } + } + + #[test] + fn test_service_register_handler () { + let mut service = IoService::::start().expect("Error creating network service"); + service.register_handler(Box::new(MyHandler)).unwrap(); + } + +} diff --git a/src/io/service.rs b/src/io/service.rs new file mode 100644 index 000000000..4ccfb2407 --- /dev/null +++ b/src/io/service.rs @@ -0,0 +1,211 @@ +use std::thread::{self, JoinHandle}; +use mio::*; +use mio::util::{Slab}; +use hash::*; +use rlp::*; +use error::*; +use io::{IoError, IoHandler}; + +pub type TimerToken = usize; +pub type StreamToken = usize; + +// Tokens +const MAX_USER_TIMERS: usize = 32; +const USER_TIMER: usize = 0; +const LAST_USER_TIMER: usize = USER_TIMER + MAX_USER_TIMERS - 1; +//const USER_TOKEN: usize = LAST_USER_TIMER + 1; + +/// Messages used to communicate with the event loop from other threads. +pub enum IoMessage where Message: Send + Sized { + /// Shutdown the event loop + Shutdown, + /// Register a new protocol handler. + AddHandler { + handler: Box+Send>, + }, + /// Broadcast a message across all protocol handlers. + UserMessage(Message) +} + +/// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem. +pub struct IoContext<'s, Message> where Message: Send + 'static { + timers: &'s mut Slab, + /// Low leve MIO Event loop for custom handler registration. + pub event_loop: &'s mut EventLoop>, +} + +impl<'s, Message> IoContext<'s, Message> where Message: Send + 'static { + /// Create a new IO access point. Takes references to all the data that can be updated within the IO handler. + fn new(event_loop: &'s mut EventLoop>, timers: &'s mut Slab) -> IoContext<'s, Message> { + IoContext { + event_loop: event_loop, + timers: timers, + } + } + + /// Register a new IO timer. Returns a new timer token. 'IoHandler::timeout' will be called with the token. + pub fn register_timer(&mut self, ms: u64) -> Result { + match self.timers.insert(UserTimer { + delay: ms, + }) { + Ok(token) => { + self.event_loop.timeout_ms(token, ms).expect("Error registering user timer"); + Ok(token.as_usize()) + }, + _ => { panic!("Max timers reached") } + } + } + + /// Broadcast a message to other IO clients + pub fn message(&mut self, message: Message) { + match self.event_loop.channel().send(IoMessage::UserMessage(message)) { + Ok(_) => {} + Err(e) => { panic!("Error sending io message {:?}", e); } + } + } +} + +struct UserTimer { + delay: u64, +} + +/// Root IO handler. Manages user handlers, messages and IO timers. +pub struct IoManager where Message: Send { + timers: Slab, + handlers: Vec>>, +} + +impl IoManager where Message: Send + 'static { + /// Creates a new instance and registers it with the event loop. + pub fn start(event_loop: &mut EventLoop>) -> Result<(), UtilError> { + let mut io = IoManager { + timers: Slab::new_starting_at(Token(USER_TIMER), MAX_USER_TIMERS), + handlers: Vec::new(), + }; + try!(event_loop.run(&mut io)); + Ok(()) + } +} + +impl Handler for IoManager where Message: Send + 'static { + type Timeout = Token; + type Message = IoMessage; + + fn ready(&mut self, event_loop: &mut EventLoop, token: Token, events: EventSet) { + if events.is_hup() { + for h in self.handlers.iter_mut() { + h.stream_hup(&mut IoContext::new(event_loop, &mut self.timers), token.as_usize()); + } + } + else if events.is_readable() { + for h in self.handlers.iter_mut() { + h.stream_readable(&mut IoContext::new(event_loop, &mut self.timers), token.as_usize()); + } + } + else if events.is_writable() { + for h in self.handlers.iter_mut() { + h.stream_writable(&mut IoContext::new(event_loop, &mut self.timers), token.as_usize()); + } + } + } + + fn timeout(&mut self, event_loop: &mut EventLoop, token: Token) { + match token.as_usize() { + USER_TIMER ... LAST_USER_TIMER => { + let delay = { + let timer = self.timers.get_mut(token).expect("Unknown user timer token"); + timer.delay + }; + for h in self.handlers.iter_mut() { + h.timeout(&mut IoContext::new(event_loop, &mut self.timers), token.as_usize()); + } + event_loop.timeout_ms(token, delay).expect("Error re-registering user timer"); + } + _ => { // Just pass the event down. IoHandler is supposed to re-register it if required. + for h in self.handlers.iter_mut() { + h.timeout(&mut IoContext::new(event_loop, &mut self.timers), token.as_usize()); + } + } + } + } + + fn notify(&mut self, event_loop: &mut EventLoop, msg: Self::Message) { + let mut m = msg; + match m { + IoMessage::Shutdown => event_loop.shutdown(), + IoMessage::AddHandler { + handler, + } => { + self.handlers.push(handler); + self.handlers.last_mut().unwrap().initialize(&mut IoContext::new(event_loop, &mut self.timers)); + }, + IoMessage::UserMessage(ref mut data) => { + for h in self.handlers.iter_mut() { + h.message(&mut IoContext::new(event_loop, &mut self.timers), data); + } + } + } + } +} + +/// Allows sending messages into the event loop. All the IO handlers will get the message +/// in the `message` callback. +pub struct IoChannel where Message: Send { + channel: Sender> +} + +impl IoChannel where Message: Send { + pub fn send(&mut self, message: Message) -> Result<(), IoError> { + try!(self.channel.send(IoMessage::UserMessage(message))); + Ok(()) + } +} + +/// General IO Service. Starts an event loop and dispatches IO requests. +/// 'Message' is a notification message type +pub struct IoService where Message: Send + 'static { + thread: Option>, + host_channel: Sender> +} + +impl IoService where Message: Send + 'static { + /// Starts IO event loop + pub fn start() -> Result, UtilError> { + let mut event_loop = EventLoop::new().unwrap(); + let channel = event_loop.channel(); + let thread = thread::spawn(move || { + IoManager::::start(&mut event_loop).unwrap(); //TODO: + }); + Ok(IoService { + thread: Some(thread), + host_channel: channel + }) + } + + /// Regiter a IO hadnler with the event loop. + pub fn register_handler(&mut self, handler: Box+Send>) -> Result<(), IoError> { + try!(self.host_channel.send(IoMessage::AddHandler { + handler: handler, + })); + Ok(()) + } + + /// Send a message over the network. Normaly `HostIo::send` should be used. This can be used from non-io threads. + pub fn send_message(&mut self, message: Message) -> Result<(), IoError> { + try!(self.host_channel.send(IoMessage::UserMessage(message))); + Ok(()) + } + + /// Create a new message channel + pub fn channel(&mut self) -> IoChannel { + IoChannel { channel: self.host_channel.clone() } + } +} + +impl Drop for IoService where Message: Send { + fn drop(&mut self) { + self.host_channel.send(IoMessage::Shutdown).unwrap(); + self.thread.take().unwrap().join().unwrap(); + } +} + diff --git a/src/lib.rs b/src/lib.rs index 9cd49b494..1a7d873fe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -30,6 +30,7 @@ //! sudo make install //! ``` +extern crate slab; extern crate rustc_serialize; extern crate mio; extern crate rand; @@ -76,6 +77,7 @@ pub mod nibbleslice; pub mod heapsizeof; pub mod squeeze; pub mod semantic_version; +pub mod io; pub mod network; pub use common::*; @@ -95,3 +97,4 @@ pub use heapsizeof::*; pub use squeeze::*; pub use semantic_version::*; pub use network::*; +pub use io::*; diff --git a/src/network/connection.rs b/src/network/connection.rs index 2c671a08a..f11c10384 100644 --- a/src/network/connection.rs +++ b/src/network/connection.rs @@ -1,14 +1,13 @@ use std::collections::VecDeque; -use mio::{Token, EventSet, EventLoop, Timeout, PollOpt, TryRead, TryWrite}; +use mio::{Handler, Token, EventSet, EventLoop, Timeout, PollOpt, TryRead, TryWrite}; use mio::tcp::*; use hash::*; use sha3::*; use bytes::*; use rlp::*; use std::io::{self, Cursor, Read}; -use network::host::{Host}; use error::*; -use network::NetworkError; +use network::error::NetworkError; use network::handshake::Handshake; use crypto; use rcrypto::blockmodes::*; @@ -133,7 +132,7 @@ impl Connection { } /// Register this connection with the IO event loop. - pub fn register(&mut self, event_loop: &mut EventLoop) -> io::Result<()> { + pub fn register(&mut self, event_loop: &mut EventLoop) -> io::Result<()> { trace!(target: "net", "connection register; token={:?}", self.token); self.interest.insert(EventSet::readable()); event_loop.register(&self.socket, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| { @@ -143,7 +142,7 @@ impl Connection { } /// Update connection registration. Should be called at the end of the IO handler. - pub fn reregister(&mut self, event_loop: &mut EventLoop) -> io::Result<()> { + pub fn reregister(&mut self, event_loop: &mut EventLoop) -> io::Result<()> { trace!(target: "net", "connection reregister; token={:?}", self.token); event_loop.reregister( &self.socket, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| { error!("Failed to reregister {:?}, {:?}", self.token, e); @@ -338,7 +337,7 @@ impl EncryptedConnection { } /// Readable IO handler. Tracker receive status and returns decoded packet if avaialable. - pub fn readable(&mut self, event_loop: &mut EventLoop) -> Result, UtilError> { + pub fn readable(&mut self, event_loop: &mut EventLoop) -> Result, UtilError> { self.idle_timeout.map(|t| event_loop.clear_timeout(t)); match self.read_state { EncryptedConnectionState::Header => { @@ -364,14 +363,14 @@ impl EncryptedConnection { } /// Writable IO handler. Processes send queeue. - pub fn writable(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { + pub fn writable(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { self.idle_timeout.map(|t| event_loop.clear_timeout(t)); try!(self.connection.writable()); Ok(()) } /// Register this connection with the event handler. - pub fn register(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { + pub fn register>(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { self.connection.expect(ENCRYPTED_HEADER_LEN); self.idle_timeout.map(|t| event_loop.clear_timeout(t)); self.idle_timeout = event_loop.timeout_ms(self.connection.token, 1800).ok(); @@ -380,7 +379,7 @@ impl EncryptedConnection { } /// Update connection registration. This should be called at the end of the event loop. - pub fn reregister(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { + pub fn reregister(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { try!(self.connection.reregister(event_loop)); Ok(()) } diff --git a/src/network/discovery.rs b/src/network/discovery.rs index f2d139f45..d2a4f21a2 100644 --- a/src/network/discovery.rs +++ b/src/network/discovery.rs @@ -10,7 +10,7 @@ use mio::udp::*; use hash::*; use sha3::Hashable; use crypto::*; -use network::host::*; +use network::node::*; const ADDRESS_BYTES_SIZE: u32 = 32; ///< Size of address type in bytes. const ADDRESS_BITS: u32 = 8 * ADDRESS_BYTES_SIZE; ///< Denoted by n in [Kademlia]. @@ -70,14 +70,14 @@ impl Discovery { self.node_buckets[Discovery::distance(&self.id, &id) as usize].nodes.push(id.clone()); } - fn start_node_discovery(&mut self, event_loop: &mut EventLoop) { + fn start_node_discovery(&mut self, event_loop: &mut EventLoop) { self.discovery_round = 0; self.discovery_id.randomize(); self.discovery_nodes.clear(); self.discover(event_loop); } - fn discover(&mut self, event_loop: &mut EventLoop) { + fn discover(&mut self, event_loop: &mut EventLoop) { if self.discovery_round == DISCOVERY_MAX_STEPS { debug!("Restarting discovery"); diff --git a/src/network/error.rs b/src/network/error.rs new file mode 100644 index 000000000..d255cb043 --- /dev/null +++ b/src/network/error.rs @@ -0,0 +1,41 @@ +use io::IoError; +use rlp::*; + +#[derive(Debug, Copy, Clone)] +pub enum DisconnectReason +{ + DisconnectRequested, + //TCPError, + //BadProtocol, + UselessPeer, + //TooManyPeers, + //DuplicatePeer, + //IncompatibleProtocol, + //NullIdentity, + //ClientQuit, + //UnexpectedIdentity, + //LocalIdentity, + //PingTimeout, +} + +#[derive(Debug)] +pub enum NetworkError { + Auth, + BadProtocol, + PeerNotFound, + Disconnect(DisconnectReason), + Io(IoError), +} + +impl From for NetworkError { + fn from(_err: DecoderError) -> NetworkError { + NetworkError::Auth + } +} + +impl From for NetworkError { + fn from(err: IoError) -> NetworkError { + NetworkError::Io(err) + } +} + diff --git a/src/network/handshake.rs b/src/network/handshake.rs index af67f7ddf..ca95808b4 100644 --- a/src/network/handshake.rs +++ b/src/network/handshake.rs @@ -6,9 +6,10 @@ use bytes::Bytes; use crypto::*; use crypto; use network::connection::{Connection}; -use network::host::{NodeId, Host, HostInfo}; +use network::host::{HostInfo}; +use network::node::NodeId; use error::*; -use network::NetworkError; +use network::error::NetworkError; #[derive(PartialEq, Eq, Debug)] enum HandshakeState { @@ -88,7 +89,7 @@ impl Handshake { } /// Readable IO handler. Drives the state change. - pub fn readable(&mut self, event_loop: &mut EventLoop, host: &HostInfo) -> Result<(), UtilError> { + pub fn readable(&mut self, event_loop: &mut EventLoop, host: &HostInfo) -> Result<(), UtilError> { self.idle_timeout.map(|t| event_loop.clear_timeout(t)); match self.state { HandshakeState::ReadingAuth => { @@ -118,7 +119,7 @@ impl Handshake { } /// Writabe IO handler. - pub fn writable(&mut self, event_loop: &mut EventLoop, _host: &HostInfo) -> Result<(), UtilError> { + pub fn writable(&mut self, event_loop: &mut EventLoop, _host: &HostInfo) -> Result<(), UtilError> { self.idle_timeout.map(|t| event_loop.clear_timeout(t)); try!(self.connection.writable()); if self.state != HandshakeState::StartSession { @@ -128,7 +129,7 @@ impl Handshake { } /// Register the IO handler with the event loop - pub fn register(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { + pub fn register>(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { self.idle_timeout.map(|t| event_loop.clear_timeout(t)); self.idle_timeout = event_loop.timeout_ms(self.connection.token, 1800).ok(); try!(self.connection.register(event_loop)); diff --git a/src/network/host.rs b/src/network/host.rs index 11dc491b2..8b1038c8a 100644 --- a/src/network/host.rs +++ b/src/network/host.rs @@ -1,31 +1,30 @@ -use std::net::{SocketAddr, ToSocketAddrs}; +use std::mem; +use std::net::{SocketAddr}; use std::collections::{HashMap}; -use std::hash::{Hash, Hasher}; +use std::hash::{Hasher}; use std::str::{FromStr}; use mio::*; -use mio::util::{Slab}; use mio::tcp::*; use mio::udp::*; use hash::*; use crypto::*; use sha3::Hashable; use rlp::*; -use time::Tm; use network::handshake::Handshake; use network::session::{Session, SessionData}; use error::*; -use network::ProtocolHandler; +use io::*; +use network::NetworkProtocolHandler; +use network::node::*; + +type Slab = ::slab::Slab; const _DEFAULT_PORT: u16 = 30304; const MAX_CONNECTIONS: usize = 1024; -const MAX_USER_TIMERS: usize = 32; const IDEAL_PEERS: u32 = 10; -/// Node public key -pub type NodeId = H512; -/// IO Timer id -pub type TimerToken = usize; +const MAINTENANCE_TIMEOUT: u64 = 1000; #[derive(Debug)] struct NetworkConfiguration { @@ -48,88 +47,16 @@ impl NetworkConfiguration { } } -#[derive(Debug)] -/// Noe address info -pub struct NodeEndpoint { - /// IP(V4 or V6) address - address: SocketAddr, - /// Address as string (can be host name). - address_str: String, - /// Conneciton port. - udp_port: u16 -} - -impl NodeEndpoint { - /// Create endpoint from string. Performs name resolution if given a host name. - fn from_str(s: &str) -> Result { - let address = s.to_socket_addrs().map(|mut i| i.next()); - match address { - Ok(Some(a)) => Ok(NodeEndpoint { - address: a, - address_str: s.to_string(), - udp_port: a.port() - }), - Ok(_) => Err(UtilError::AddressResolve(None)), - Err(e) => Err(UtilError::AddressResolve(Some(e))) - } - } -} - -#[derive(PartialEq, Eq, Copy, Clone)] -enum PeerType { - Required, - Optional -} - -struct Node { - id: NodeId, - endpoint: NodeEndpoint, - peer_type: PeerType, - last_attempted: Option, -} - -impl FromStr for Node { - type Err = UtilError; - fn from_str(s: &str) -> Result { - let (id, endpoint) = if &s[0..8] == "enode://" && s.len() > 136 && &s[136..137] == "@" { - (try!(NodeId::from_str(&s[8..136])), try!(NodeEndpoint::from_str(&s[137..]))) - } - else { - (NodeId::new(), try!(NodeEndpoint::from_str(s))) - }; - - Ok(Node { - id: id, - endpoint: endpoint, - peer_type: PeerType::Optional, - last_attempted: None, - }) - } -} - -impl PartialEq for Node { - fn eq(&self, other: &Self) -> bool { - self.id == other.id - } -} -impl Eq for Node { } - -impl Hash for Node { - fn hash(&self, state: &mut H) where H: Hasher { - self.id.hash(state) - } -} - // Tokens -const TCP_ACCEPT: usize = 1; -const IDLE: usize = 3; -const NODETABLE_RECEIVE: usize = 4; -const NODETABLE_MAINTAIN: usize = 5; -const NODETABLE_DISCOVERY: usize = 6; -const FIRST_CONNECTION: usize = 7; +//const TOKEN_BEGIN: usize = USER_TOKEN_START; // TODO: ICE in rustc 1.7.0-nightly (49c382779 2016-01-12) +const TOKEN_BEGIN: usize = 32; +const TCP_ACCEPT: usize = TOKEN_BEGIN + 1; +const IDLE: usize = TOKEN_BEGIN + 2; +const NODETABLE_RECEIVE: usize = TOKEN_BEGIN + 3; +const NODETABLE_MAINTAIN: usize = TOKEN_BEGIN + 4; +const NODETABLE_DISCOVERY: usize = TOKEN_BEGIN + 5; +const FIRST_CONNECTION: usize = TOKEN_BEGIN + 16; const LAST_CONNECTION: usize = FIRST_CONNECTION + MAX_CONNECTIONS - 1; -const USER_TIMER: usize = LAST_CONNECTION; -const LAST_USER_TIMER: usize = USER_TIMER + MAX_USER_TIMERS - 1; /// Protocol handler level packet id pub type PacketId = u8; @@ -137,12 +64,10 @@ pub type PacketId = u8; pub type ProtocolId = &'static str; /// Messages used to communitate with the event loop from other threads. -pub enum HostMessage { - /// Shutdown the event loop - Shutdown, +pub enum NetworkIoMessage where Message: Send { /// Register a new protocol handler. AddHandler { - handler: Box, + handler: Option+Send>>, protocol: ProtocolId, versions: Vec, }, @@ -153,19 +78,8 @@ pub enum HostMessage { protocol: ProtocolId, data: Vec, }, - /// Broadcast a message across the protocol handlers. - UserMessage(UserMessage), -} - -/// Id for broadcast message -pub type UserMessageId = u32; - -/// User -pub struct UserMessage { - /// ID of a protocol - pub protocol: ProtocolId, - pub id: UserMessageId, - pub data: Option>, + /// User message + User(Message), } /// Local (temporary) peer session ID. @@ -190,21 +104,24 @@ impl Encodable for CapabilityInfo { } /// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem. -pub struct HostIo<'s> { +pub struct NetworkContext<'s, 'io, Message> where Message: Send + 'static, 'io: 's { + io: &'s mut IoContext<'io, NetworkIoMessage>, protocol: ProtocolId, connections: &'s mut Slab, - timers: &'s mut Slab, - session: Option, - event_loop: &'s mut EventLoop, + timers: &'s mut HashMap, + session: Option, } -impl<'s> HostIo<'s> { - /// Create a new IO access point. Takes references to all the data that can be updated within the IO handler. - fn new(protocol: ProtocolId, session: Option, event_loop: &'s mut EventLoop, connections: &'s mut Slab, timers: &'s mut Slab) -> HostIo<'s> { - HostIo { +impl<'s, 'io, Message> NetworkContext<'s, 'io, Message> where Message: Send + 'static, { + /// Create a new network IO access point. Takes references to all the data that can be updated within the IO handler. + fn new(io: &'s mut IoContext<'io, NetworkIoMessage>, + protocol: ProtocolId, + session: Option, connections: &'s mut Slab, + timers: &'s mut HashMap) -> NetworkContext<'s, 'io, Message> { + NetworkContext { + io: io, protocol: protocol, session: session, - event_loop: event_loop, connections: connections, timers: timers, } @@ -212,7 +129,7 @@ impl<'s> HostIo<'s> { /// Send a packet over the network to another peer. pub fn send(&mut self, peer: PeerId, packet_id: PacketId, data: Vec) -> Result<(), UtilError> { - match self.connections.get_mut(Token(peer)) { + match self.connections.get_mut(peer) { Some(&mut ConnectionEntry::Session(ref mut s)) => { s.send_packet(self.protocol, packet_id as u8, &data).unwrap_or_else(|e| { warn!(target: "net", "Send error: {:?}", e); @@ -228,49 +145,40 @@ impl<'s> HostIo<'s> { /// Respond to a current network message. Panics if no there is no packet in the context. pub fn respond(&mut self, packet_id: PacketId, data: Vec) -> Result<(), UtilError> { match self.session { - Some(session) => self.send(session.as_usize(), packet_id, data), + Some(session) => self.send(session, packet_id, data), None => { panic!("Respond: Session does not exist") } } } - /// Register a new IO timer. Returns a new timer toke. 'ProtocolHandler::timeout' will be called with the token. - pub fn register_timer(&mut self, ms: u64) -> Result{ - match self.timers.insert(UserTimer { - delay: ms, - protocol: self.protocol, - }) { - Ok(token) => { - self.event_loop.timeout_ms(token, ms).expect("Error registering user timer"); - Ok(token.as_usize()) - }, - _ => { panic!("Max timers reached") } - } - } - - /// Broadcast a message to other IO clients - pub fn message(&mut self, id: UserMessageId, data: Option>) { - match self.event_loop.channel().send(HostMessage::UserMessage(UserMessage { - protocol: self.protocol, - id: id, - data: data - })) { - Ok(_) => {} - Err(e) => { panic!("Error sending io message {:?}", e); } - } - } - /// Disable current protocol capability for given peer. If no capabilities left peer gets disconnected. pub fn disable_peer(&mut self, _peer: PeerId) { //TODO: remove capability, disconnect if no capabilities left } -} + /// Register a new IO timer. Returns a new timer token. 'NetworkProtocolHandler::timeout' will be called with the token. + pub fn register_timer(&mut self, ms: u64) -> Result{ + match self.io.register_timer(ms) { + Ok(token) => { + self.timers.insert(token, self.protocol); + Ok(token) + }, + e => e, + } + } -struct UserTimer { - protocol: ProtocolId, - delay: u64, + /// Returns peer identification string + pub fn peer_info(&self, peer: PeerId) -> String { + match self.connections.get(peer) { + Some(&ConnectionEntry::Session(ref s)) => { + s.info.client_version.clone() + }, + _ => { + "unknown".to_string() + } + } + } } /// Shared host information @@ -315,67 +223,40 @@ enum ConnectionEntry { } /// Root IO handler. Manages protocol handlers, IO timers and network connections. -pub struct Host { - info: HostInfo, - _udp_socket: UdpSocket, - _listener: TcpListener, +pub struct Host where Message: Send { + pub info: HostInfo, + udp_socket: UdpSocket, + listener: TcpListener, connections: Slab, - timers: Slab, + timers: HashMap, nodes: HashMap, - handlers: HashMap>, - _idle_timeout: Timeout, + handlers: HashMap>>, } -impl Host { - /// Creates a new instance and registers it with the event loop. - pub fn start(event_loop: &mut EventLoop) -> Result<(), UtilError> { +impl Host where Message: Send { + pub fn new() -> Host { let config = NetworkConfiguration::new(); - /* - match ::ifaces::Interface::get_all().unwrap().into_iter().filter(|x| x.kind == ::ifaces::Kind::Packet && x.addr.is_some()).next() { - Some(iface) => config.public_address = iface.addr.unwrap(), - None => warn!("No public network interface"), - */ - let addr = config.listen_address; // Setup the server socket let listener = TcpListener::bind(&addr).unwrap(); - // Start listening for incoming connections - event_loop.register(&listener, Token(TCP_ACCEPT), EventSet::readable(), PollOpt::edge()).unwrap(); - let idle_timeout = event_loop.timeout_ms(Token(IDLE), 1000).unwrap(); //TODO: check delay - // open the udp socket let udp_socket = UdpSocket::bound(&addr).unwrap(); - event_loop.register(&udp_socket, Token(NODETABLE_RECEIVE), EventSet::readable(), PollOpt::edge()).unwrap(); - event_loop.timeout_ms(Token(NODETABLE_MAINTAIN), 7200).unwrap(); - let port = config.listen_address.port(); - - let mut host = Host { + Host:: { info: HostInfo { keys: KeyPair::create().unwrap(), config: config, nonce: H256::random(), protocol_version: 4, client_version: "parity".to_string(), - listen_port: port, + listen_port: 0, capabilities: Vec::new(), }, - _udp_socket: udp_socket, - _listener: listener, - connections: Slab::new_starting_at(Token(FIRST_CONNECTION), MAX_CONNECTIONS), - timers: Slab::new_starting_at(Token(USER_TIMER), MAX_USER_TIMERS), + udp_socket: udp_socket, + listener: listener, + connections: Slab::new_starting_at(FIRST_CONNECTION, MAX_CONNECTIONS), + timers: HashMap::new(), nodes: HashMap::new(), handlers: HashMap::new(), - _idle_timeout: idle_timeout, - }; - - host.add_node("enode://c022e7a27affdd1632f2e67dffeb87f02bf506344bb142e08d12b28e7e5c6e5dbb8183a46a77bff3631b51c12e8cf15199f797feafdc8834aaf078ad1a2bcfa0@127.0.0.1:30303"); - host.add_node("enode://5374c1bff8df923d3706357eeb4983cd29a63be40a269aaa2296ee5f3b2119a8978c0ed68b8f6fc84aad0df18790417daadf91a4bfbb786a16c9b0a199fa254a@gav.ethdev.com:30300"); - host.add_node("enode://e58d5e26b3b630496ec640f2530f3e7fa8a8c7dfe79d9e9c4aac80e3730132b869c852d3125204ab35bb1b1951f6f2d40996c1034fd8c5a69b383ee337f02ddc@gav.ethdev.com:30303"); - host.add_node("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@52.16.188.185:30303"); - host.add_node("enode://7f25d3eab333a6b98a8b5ed68d962bb22c876ffcd5561fca54e3c2ef27f754df6f7fd7c9b74cc919067abac154fb8e1f8385505954f161ae440abc355855e034@54.207.93.166:30303"); - host.add_node("enode://5374c1bff8df923d3706357eeb4983cd29a63be40a269aaa2296ee5f3b2119a8978c0ed68b8f6fc84aad0df18790417daadf91a4bfbb786a16c9b0a199fa254a@92.51.165.126:30303"); - - try!(event_loop.run(&mut host)); - Ok(()) + } } fn add_node(&mut self, id: &str) { @@ -387,8 +268,9 @@ impl Host { } } - fn maintain_network(&mut self, event_loop: &mut EventLoop) { - self.connect_peers(event_loop); + fn maintain_network(&mut self, io: &mut IoContext>) { + self.connect_peers(io); + io.event_loop.timeout_ms(Token(IDLE), MAINTENANCE_TIMEOUT).unwrap(); } fn have_session(&self, id: &NodeId) -> bool { @@ -399,8 +281,7 @@ impl Host { self.connections.iter().any(|e| match e { &ConnectionEntry::Handshake(ref h) => h.id.eq(&id), _ => false }) } - fn connect_peers(&mut self, event_loop: &mut EventLoop) { - + fn connect_peers(&mut self, io: &mut IoContext>) { struct NodeInfo { id: NodeId, peer_type: PeerType @@ -425,7 +306,7 @@ impl Host { for n in to_connect.iter() { if n.peer_type == PeerType::Required { if req_conn < IDEAL_PEERS { - self.connect_peer(&n.id, event_loop); + self.connect_peer(&n.id, io); } req_conn += 1; } @@ -440,14 +321,14 @@ impl Host { for n in to_connect.iter() { if n.peer_type == PeerType::Optional && open_slots > 0 { open_slots -= 1; - self.connect_peer(&n.id, event_loop); + self.connect_peer(&n.id, io); } } } } } - fn connect_peer(&mut self, id: &NodeId, event_loop: &mut EventLoop) { + fn connect_peer(&mut self, id: &NodeId, io: &mut IoContext>) { if self.have_session(id) { warn!("Aborted connect. Node already connected."); @@ -473,12 +354,12 @@ impl Host { }; let nonce = self.info.next_nonce(); - match self.connections.insert_with(|token| ConnectionEntry::Handshake(Handshake::new(token, id, socket, &nonce).expect("Can't create handshake"))) { + match self.connections.insert_with(|token| ConnectionEntry::Handshake(Handshake::new(Token(token), id, socket, &nonce).expect("Can't create handshake"))) { Some(token) => { match self.connections.get_mut(token) { Some(&mut ConnectionEntry::Handshake(ref mut h)) => { h.start(&self.info, true) - .and_then(|_| h.register(event_loop)) + .and_then(|_| h.register(io.event_loop)) .unwrap_or_else (|e| { debug!(target: "net", "Handshake create error: {:?}", e); }); @@ -491,23 +372,23 @@ impl Host { } - fn accept(&mut self, _event_loop: &mut EventLoop) { + fn accept(&mut self, _io: &mut IoContext>) { trace!(target: "net", "accept"); } - fn connection_writable(&mut self, token: Token, event_loop: &mut EventLoop) { + fn connection_writable<'s>(&'s mut self, token: StreamToken, io: &mut IoContext<'s, NetworkIoMessage>) { let mut kill = false; let mut create_session = false; match self.connections.get_mut(token) { Some(&mut ConnectionEntry::Handshake(ref mut h)) => { - h.writable(event_loop, &self.info).unwrap_or_else(|e| { + h.writable(io.event_loop, &self.info).unwrap_or_else(|e| { debug!(target: "net", "Handshake write error: {:?}", e); kill = true; }); create_session = h.done(); }, Some(&mut ConnectionEntry::Session(ref mut s)) => { - s.writable(event_loop, &self.info).unwrap_or_else(|e| { + s.writable(io.event_loop, &self.info).unwrap_or_else(|e| { debug!(target: "net", "Session write error: {:?}", e); kill = true; }); @@ -517,38 +398,38 @@ impl Host { } } if kill { - self.kill_connection(token, event_loop); - } - if create_session { - self.start_session(token, event_loop); + self.kill_connection(token, io); + return; + } else if create_session { + self.start_session(token, io); } match self.connections.get_mut(token) { Some(&mut ConnectionEntry::Session(ref mut s)) => { - s.reregister(event_loop).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e)); + s.reregister(io.event_loop).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e)); }, _ => (), } } - fn connection_closed(&mut self, token: Token, event_loop: &mut EventLoop) { - self.kill_connection(token, event_loop); + fn connection_closed<'s>(&'s mut self, token: TimerToken, io: &mut IoContext<'s, NetworkIoMessage>) { + self.kill_connection(token, io); } - fn connection_readable(&mut self, token: Token, event_loop: &mut EventLoop) { + fn connection_readable<'s>(&'s mut self, token: StreamToken, io: &mut IoContext<'s, NetworkIoMessage>) { let mut kill = false; let mut create_session = false; let mut ready_data: Vec = Vec::new(); let mut packet_data: Option<(ProtocolId, PacketId, Vec)> = None; match self.connections.get_mut(token) { Some(&mut ConnectionEntry::Handshake(ref mut h)) => { - h.readable(event_loop, &self.info).unwrap_or_else(|e| { + h.readable(io.event_loop, &self.info).unwrap_or_else(|e| { debug!(target: "net", "Handshake read error: {:?}", e); kill = true; }); create_session = h.done(); }, Some(&mut ConnectionEntry::Session(ref mut s)) => { - let sd = { s.readable(event_loop, &self.info).unwrap_or_else(|e| { + let sd = { s.readable(io.event_loop, &self.info).unwrap_or_else(|e| { debug!(target: "net", "Session read error: {:?}", e); kill = true; SessionData::None @@ -579,33 +460,49 @@ impl Host { } } if kill { - self.kill_connection(token, event_loop); + self.kill_connection(token, io); + return; } if create_session { - self.start_session(token, event_loop); + self.start_session(token, io); } for p in ready_data { let mut h = self.handlers.get_mut(p).unwrap(); - h.connected(&mut HostIo::new(p, Some(token), event_loop, &mut self.connections, &mut self.timers), &token.as_usize()); + h.connected(&mut NetworkContext::new(io, p, Some(token), &mut self.connections, &mut self.timers), &token); } if let Some((p, packet_id, data)) = packet_data { let mut h = self.handlers.get_mut(p).unwrap(); - h.read(&mut HostIo::new(p, Some(token), event_loop, &mut self.connections, &mut self.timers), &token.as_usize(), packet_id, &data[1..]); + h.read(&mut NetworkContext::new(io, p, Some(token), &mut self.connections, &mut self.timers), &token, packet_id, &data[1..]); } match self.connections.get_mut(token) { Some(&mut ConnectionEntry::Session(ref mut s)) => { - s.reregister(event_loop).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e)); + s.reregister(io.event_loop).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e)); }, _ => (), } } - fn start_session(&mut self, token: Token, event_loop: &mut EventLoop) { + fn start_session(&mut self, token: StreamToken, io: &mut IoContext>) { let info = &self.info; + // TODO: use slab::replace_with (currently broken) + /* + match self.connections.remove(token) { + Some(ConnectionEntry::Handshake(h)) => { + match Session::new(h, io.event_loop, info) { + Ok(session) => { + assert!(token == self.connections.insert(ConnectionEntry::Session(session)).ok().unwrap()); + }, + Err(e) => { + debug!(target: "net", "Session construction error: {:?}", e); + } + } + }, + _ => panic!("Error updating slab with session") + }*/ self.connections.replace_with(token, |c| { match c { - ConnectionEntry::Handshake(h) => Session::new(h, event_loop, info) + ConnectionEntry::Handshake(h) => Session::new(h, io.event_loop, info) .map(|s| Some(ConnectionEntry::Session(s))) .unwrap_or_else(|e| { debug!(target: "net", "Session construction error: {:?}", e); @@ -616,12 +513,13 @@ impl Host { }).expect("Error updating slab with session"); } - fn connection_timeout(&mut self, token: Token, event_loop: &mut EventLoop) { - self.kill_connection(token, event_loop) + fn connection_timeout<'s>(&'s mut self, token: StreamToken, io: &mut IoContext<'s, NetworkIoMessage>) { + self.kill_connection(token, io) } - fn kill_connection(&mut self, token: Token, event_loop: &mut EventLoop) { + fn kill_connection<'s>(&'s mut self, token: StreamToken, io: &mut IoContext<'s, NetworkIoMessage>) { let mut to_disconnect: Vec = Vec::new(); + let mut remove = true; match self.connections.get_mut(token) { Some(&mut ConnectionEntry::Handshake(_)) => (), // just abandon handshake Some(&mut ConnectionEntry::Session(ref mut s)) if s.is_ready() => { @@ -631,90 +529,112 @@ impl Host { } } }, - _ => (), + _ => { + remove = false; + }, } for p in to_disconnect { let mut h = self.handlers.get_mut(p).unwrap(); - h.disconnected(&mut HostIo::new(p, Some(token), event_loop, &mut self.connections, &mut self.timers), &token.as_usize()); + h.disconnected(&mut NetworkContext::new(io, p, Some(token), &mut self.connections, &mut self.timers), &token); + } + if remove { + self.connections.remove(token); } - self.connections.remove(token); } } -impl Handler for Host { - type Timeout = Token; - type Message = HostMessage; +impl IoHandler> for Host where Message: Send + 'static { + /// Initialize networking + fn initialize(&mut self, io: &mut IoContext>) { + /* + match ::ifaces::Interface::get_all().unwrap().into_iter().filter(|x| x.kind == ::ifaces::Kind::Packet && x.addr.is_some()).next() { + Some(iface) => config.public_address = iface.addr.unwrap(), + None => warn!("No public network interface"), + */ - fn ready(&mut self, event_loop: &mut EventLoop, token: Token, events: EventSet) { - if events.is_hup() { - trace!(target: "net", "hup"); - match token.as_usize() { - FIRST_CONNECTION ... LAST_CONNECTION => self.connection_closed(token, event_loop), - _ => warn!(target: "net", "Unexpected hup"), - }; - } - else if events.is_readable() { - match token.as_usize() { - TCP_ACCEPT => self.accept(event_loop), - IDLE => self.maintain_network(event_loop), - FIRST_CONNECTION ... LAST_CONNECTION => self.connection_readable(token, event_loop), - NODETABLE_RECEIVE => {}, - _ => panic!("Received unknown readable token"), - } - } - else if events.is_writable() { - match token.as_usize() { - FIRST_CONNECTION ... LAST_CONNECTION => self.connection_writable(token, event_loop), - _ => panic!("Received unknown writable token"), - } + // Start listening for incoming connections + io.event_loop.register(&self.listener, Token(TCP_ACCEPT), EventSet::readable(), PollOpt::edge()).unwrap(); + io.event_loop.timeout_ms(Token(IDLE), MAINTENANCE_TIMEOUT).unwrap(); + // open the udp socket + io.event_loop.register(&self.udp_socket, Token(NODETABLE_RECEIVE), EventSet::readable(), PollOpt::edge()).unwrap(); + io.event_loop.timeout_ms(Token(NODETABLE_MAINTAIN), 7200).unwrap(); + let port = self.info.config.listen_address.port(); + self.info.listen_port = port; + + //self.add_node("enode://c022e7a27affdd1632f2e67dffeb87f02bf506344bb142e08d12b28e7e5c6e5dbb8183a46a77bff3631b51c12e8cf15199f797feafdc8834aaf078ad1a2bcfa0@127.0.0.1:30303"); + // GO bootnodes + self.add_node("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@52.16.188.185:30303"); // IE + self.add_node("enode://de471bccee3d042261d52e9bff31458daecc406142b401d4cd848f677479f73104b9fdeb090af9583d3391b7f10cb2ba9e26865dd5fca4fcdc0fb1e3b723c786@54.94.239.50:30303"); // BR + self.add_node("enode://1118980bf48b0a3640bdba04e0fe78b1add18e1cd99bf22d53daac1fd9972ad650df52176e7c7d89d1114cfef2bc23a2959aa54998a46afcf7d91809f0855082@52.74.57.123:30303"); // SG + // ETH/DEV cpp-ethereum (poc-9.ethdev.com) + self.add_node("enode://979b7fa28feeb35a4741660a16076f1943202cb72b6af70d327f053e248bab9ba81760f39d0701ef1d8f89cc1fbd2cacba0710a12cd5314d5e0c9021aa3637f9@5.1.83.226:30303"); + } + + fn stream_hup<'s>(&'s mut self, io: &mut IoContext<'s, NetworkIoMessage>, stream: StreamToken) { + trace!(target: "net", "Hup: {}", stream); + match stream { + FIRST_CONNECTION ... LAST_CONNECTION => self.connection_closed(stream, io), + _ => warn!(target: "net", "Unexpected hup"), + }; + } + + fn stream_readable<'s>(&'s mut self, io: &mut IoContext<'s, NetworkIoMessage>, stream: StreamToken) { + match stream { + FIRST_CONNECTION ... LAST_CONNECTION => self.connection_readable(stream, io), + NODETABLE_RECEIVE => {}, + TCP_ACCEPT => self.accept(io), + _ => panic!("Received unknown readable token"), } } - fn timeout(&mut self, event_loop: &mut EventLoop, token: Token) { - match token.as_usize() { - IDLE => self.maintain_network(event_loop), - FIRST_CONNECTION ... LAST_CONNECTION => self.connection_timeout(token, event_loop), + fn stream_writable<'s>(&'s mut self, io: &mut IoContext<'s, NetworkIoMessage>, stream: StreamToken) { + match stream { + FIRST_CONNECTION ... LAST_CONNECTION => self.connection_writable(stream, io), + _ => panic!("Received unknown writable token"), + } + } + + fn timeout<'s>(&'s mut self, io: &mut IoContext<'s, NetworkIoMessage>, token: TimerToken) { + match token { + IDLE => self.maintain_network(io), + FIRST_CONNECTION ... LAST_CONNECTION => self.connection_timeout(token, io), NODETABLE_DISCOVERY => {}, NODETABLE_MAINTAIN => {}, - USER_TIMER ... LAST_USER_TIMER => { - let (protocol, delay) = { - let timer = self.timers.get_mut(token).expect("Unknown user timer token"); - (timer.protocol, timer.delay) - }; + _ => { + let protocol = *self.timers.get_mut(&token).expect("Unknown user timer token"); match self.handlers.get_mut(protocol) { None => { warn!(target: "net", "No handler found for protocol: {:?}", protocol) }, Some(h) => { - h.timeout(&mut HostIo::new(protocol, None, event_loop, &mut self.connections, &mut self.timers), token.as_usize()); - event_loop.timeout_ms(token, delay).expect("Error re-registering user timer"); + h.timeout(&mut NetworkContext::new(io, protocol, Some(token), &mut self.connections, &mut self.timers), token); } } } - _ => panic!("Unknown timer token"), } } - fn notify(&mut self, event_loop: &mut EventLoop, msg: Self::Message) { - match msg { - HostMessage::Shutdown => event_loop.shutdown(), - HostMessage::AddHandler { - handler, - protocol, - versions + fn message<'s>(&'s mut self, io: &mut IoContext<'s, NetworkIoMessage>, message: &'s mut NetworkIoMessage) { + match message { + &mut NetworkIoMessage::AddHandler { + ref mut handler, + ref protocol, + ref versions } => { - self.handlers.insert(protocol, handler); + let mut h = mem::replace(handler, None).unwrap(); + h.initialize(&mut NetworkContext::new(io, protocol, None, &mut self.connections, &mut self.timers)); + self.handlers.insert(protocol, h); for v in versions { - self.info.capabilities.push(CapabilityInfo { protocol: protocol, version: v, packet_count:0 }); + self.info.capabilities.push(CapabilityInfo { protocol: protocol, version: *v, packet_count:0 }); } }, - HostMessage::Send { - peer, - packet_id, - protocol, - data, + &mut NetworkIoMessage::Send { + ref peer, + ref packet_id, + ref protocol, + ref data, } => { - match self.connections.get_mut(Token(peer as usize)) { + match self.connections.get_mut(*peer as usize) { Some(&mut ConnectionEntry::Session(ref mut s)) => { - s.send_packet(protocol, packet_id as u8, &data).unwrap_or_else(|e| { + s.send_packet(protocol, *packet_id as u8, &data).unwrap_or_else(|e| { warn!(target: "net", "Send error: {:?}", e); }); //TODO: don't copy vector data }, @@ -723,11 +643,9 @@ impl Handler for Host { } } }, - HostMessage::UserMessage(message) => { + &mut NetworkIoMessage::User(ref message) => { for (p, h) in self.handlers.iter_mut() { - if p != &message.protocol { - h.message(&mut HostIo::new(message.protocol, None, event_loop, &mut self.connections, &mut self.timers), &message); - } + h.message(&mut NetworkContext::new(io, p, None, &mut self.connections, &mut self.timers), &message); } } } diff --git a/src/network/mod.rs b/src/network/mod.rs index cdce08d00..a47e88927 100644 --- a/src/network/mod.rs +++ b/src/network/mod.rs @@ -4,38 +4,42 @@ /// /// ```rust /// extern crate ethcore_util as util; -/// use util::network::*; +/// use util::*; /// /// struct MyHandler; /// -/// impl ProtocolHandler for MyHandler { -/// fn initialize(&mut self, io: &mut HandlerIo) { +/// struct MyMessage { +/// data: u32 +/// } +/// +/// impl NetworkProtocolHandler for MyHandler { +/// fn initialize(&mut self, io: &mut NetworkContext) { /// io.register_timer(1000); /// } /// -/// fn read(&mut self, io: &mut HandlerIo, peer: &PeerId, packet_id: u8, data: &[u8]) { +/// fn read(&mut self, io: &mut NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { /// println!("Received {} ({} bytes) from {}", packet_id, data.len(), peer); /// } /// -/// fn connected(&mut self, io: &mut HandlerIo, peer: &PeerId) { +/// fn connected(&mut self, io: &mut NetworkContext, peer: &PeerId) { /// println!("Connected {}", peer); /// } /// -/// fn disconnected(&mut self, io: &mut HandlerIo, peer: &PeerId) { +/// fn disconnected(&mut self, io: &mut NetworkContext, peer: &PeerId) { /// println!("Disconnected {}", peer); /// } /// -/// fn timeout(&mut self, io: &mut HandlerIo, timer: TimerToken) { +/// fn timeout(&mut self, io: &mut NetworkContext, timer: TimerToken) { /// println!("Timeout {}", timer); /// } /// -/// fn message(&mut self, io: &mut HandlerIo, message: &Message) { -/// println!("Message {}:{}", message.protocol, message.id); +/// fn message(&mut self, io: &mut NetworkContext, message: &MyMessage) { +/// println!("Message {}", message.data); /// } /// } /// /// fn main () { -/// let mut service = NetworkService::start().expect("Error creating network service"); +/// let mut service = NetworkService::::start().expect("Error creating network service"); /// service.register_protocol(Box::new(MyHandler), "myproto", &[1u8]); /// /// // Wait for quit condition @@ -43,76 +47,40 @@ /// // Drop the service /// } /// ``` -extern crate mio; mod host; mod connection; mod handshake; mod session; mod discovery; mod service; - -#[derive(Debug, Copy, Clone)] -pub enum DisconnectReason -{ - DisconnectRequested, - TCPError, - BadProtocol, - UselessPeer, - TooManyPeers, - DuplicatePeer, - IncompatibleProtocol, - NullIdentity, - ClientQuit, - UnexpectedIdentity, - LocalIdentity, - PingTimeout, -} - -#[derive(Debug)] -pub enum NetworkError { - Auth, - BadProtocol, - PeerNotFound, - Disconnect(DisconnectReason), - Mio(::std::io::Error), -} - -impl From<::rlp::DecoderError> for NetworkError { - fn from(_err: ::rlp::DecoderError) -> NetworkError { - NetworkError::Auth - } -} - -impl From<::mio::NotifyError> for NetworkError { - fn from(_err: ::mio::NotifyError) -> NetworkError { - NetworkError::Mio(::std::io::Error::new(::std::io::ErrorKind::ConnectionAborted, "Network IO notification error")) - } -} +mod error; +mod node; pub type PeerId = host::PeerId; pub type PacketId = host::PacketId; -pub type TimerToken = host::TimerToken; -pub type HandlerIo<'s> = host::HostIo<'s>; -pub type Message = host::UserMessage; -pub type MessageId = host::UserMessageId; +pub type NetworkContext<'s,'io, Message> = host::NetworkContext<'s, 'io, Message>; +pub type NetworkService = service::NetworkService; +pub type NetworkIoMessage = host::NetworkIoMessage; +pub use network::host::NetworkIoMessage::User as UserMessage; +pub type NetworkError = error::NetworkError; + +use io::*; /// Network IO protocol handler. This needs to be implemented for each new subprotocol. -/// TODO: Separate p2p networking IO from IPC IO. `timeout` and `message` should go to a more genera IO provider. /// All the handler function are called from within IO event loop. -pub trait ProtocolHandler: Send { - /// Initialize the hadler - fn initialize(&mut self, io: &mut HandlerIo); +/// `Message` is the type for message data. +pub trait NetworkProtocolHandler: Send where Message: Send { + /// Initialize the handler + fn initialize(&mut self, _io: &mut NetworkContext) {} /// Called when new network packet received. - fn read(&mut self, io: &mut HandlerIo, peer: &PeerId, packet_id: u8, data: &[u8]); + fn read(&mut self, io: &mut NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]); /// Called when new peer is connected. Only called when peer supports the same protocol. - fn connected(&mut self, io: &mut HandlerIo, peer: &PeerId); + fn connected(&mut self, io: &mut NetworkContext, peer: &PeerId); /// Called when a previously connected peer disconnects. - fn disconnected(&mut self, io: &mut HandlerIo, peer: &PeerId); - /// Timer function called after a timeout created with `HandlerIo::timeout`. - fn timeout(&mut self, io: &mut HandlerIo, timer: TimerToken); - /// Called when a broadcasted message is received. The message can only be sent from a different protocol handler. - fn message(&mut self, io: &mut HandlerIo, message: &Message); + fn disconnected(&mut self, io: &mut NetworkContext, peer: &PeerId); + /// Timer function called after a timeout created with `NetworkContext::timeout`. + fn timeout(&mut self, _io: &mut NetworkContext, _timer: TimerToken) {} + /// Called when a broadcasted message is received. The message can only be sent from a different IO handler. + fn message(&mut self, _io: &mut NetworkContext, _message: &Message) {} } -pub type NetworkService = service::NetworkService; - diff --git a/src/network/node.rs b/src/network/node.rs new file mode 100644 index 000000000..5c08e0a66 --- /dev/null +++ b/src/network/node.rs @@ -0,0 +1,83 @@ +use std::net::{SocketAddr, ToSocketAddrs}; +use std::hash::{Hash, Hasher}; +use std::str::{FromStr}; +use hash::*; +use rlp::*; +use time::Tm; +use error::*; + +/// Node public key +pub type NodeId = H512; + +#[derive(Debug)] +/// Noe address info +pub struct NodeEndpoint { + /// IP(V4 or V6) address + pub address: SocketAddr, + /// Address as string (can be host name). + pub address_str: String, + /// Conneciton port. + pub udp_port: u16 +} + +impl NodeEndpoint { + /// Create endpoint from string. Performs name resolution if given a host name. + fn from_str(s: &str) -> Result { + let address = s.to_socket_addrs().map(|mut i| i.next()); + match address { + Ok(Some(a)) => Ok(NodeEndpoint { + address: a, + address_str: s.to_string(), + udp_port: a.port() + }), + Ok(_) => Err(UtilError::AddressResolve(None)), + Err(e) => Err(UtilError::AddressResolve(Some(e))) + } + } +} + +#[derive(PartialEq, Eq, Copy, Clone)] +pub enum PeerType { + Required, + Optional +} + +pub struct Node { + pub id: NodeId, + pub endpoint: NodeEndpoint, + pub peer_type: PeerType, + pub last_attempted: Option, +} + +impl FromStr for Node { + type Err = UtilError; + fn from_str(s: &str) -> Result { + let (id, endpoint) = if &s[0..8] == "enode://" && s.len() > 136 && &s[136..137] == "@" { + (try!(NodeId::from_str(&s[8..136])), try!(NodeEndpoint::from_str(&s[137..]))) + } + else { + (NodeId::new(), try!(NodeEndpoint::from_str(s))) + }; + + Ok(Node { + id: id, + endpoint: endpoint, + peer_type: PeerType::Optional, + last_attempted: None, + }) + } +} + +impl PartialEq for Node { + fn eq(&self, other: &Self) -> bool { + self.id == other.id + } +} +impl Eq for Node { } + +impl Hash for Node { + fn hash(&self, state: &mut H) where H: Hasher { + self.id.hash(state) + } +} + diff --git a/src/network/service.rs b/src/network/service.rs index 23cbb57d9..48cc11152 100644 --- a/src/network/service.rs +++ b/src/network/service.rs @@ -1,32 +1,32 @@ -use std::thread::{self, JoinHandle}; -use mio::*; use error::*; -use network::{NetworkError, ProtocolHandler}; -use network::host::{Host, HostMessage, PeerId, PacketId, ProtocolId}; +use network::{NetworkProtocolHandler}; +use network::error::{NetworkError}; +use network::host::{Host, NetworkIoMessage, PeerId, PacketId, ProtocolId}; +use io::*; /// IO Service with networking -pub struct NetworkService { - thread: Option>, - host_channel: Sender +/// `Message` defines a notification data type. +pub struct NetworkService where Message: Send + 'static { + io_service: IoService>, + host_info: String, } -impl NetworkService { +impl NetworkService where Message: Send + 'static { /// Starts IO event loop - pub fn start() -> Result { - let mut event_loop = EventLoop::new().unwrap(); - let channel = event_loop.channel(); - let thread = thread::spawn(move || { - Host::start(&mut event_loop).unwrap(); //TODO: - }); + pub fn start() -> Result, UtilError> { + let mut io_service = try!(IoService::>::start()); + let host = Box::new(Host::new()); + let host_info = host.info.client_version.clone(); + try!(io_service.register_handler(host)); Ok(NetworkService { - thread: Some(thread), - host_channel: channel + io_service: io_service, + host_info: host_info, }) } /// Send a message over the network. Normaly `HostIo::send` should be used. This can be used from non-io threads. pub fn send(&mut self, peer: &PeerId, packet_id: PacketId, protocol: ProtocolId, data: &[u8]) -> Result<(), NetworkError> { - try!(self.host_channel.send(HostMessage::Send { + try!(self.io_service.send_message(NetworkIoMessage::Send { peer: *peer, packet_id: packet_id, protocol: protocol, @@ -36,20 +36,25 @@ impl NetworkService { } /// Regiter a new protocol handler with the event loop. - pub fn register_protocol(&mut self, handler: Box, protocol: ProtocolId, versions: &[u8]) -> Result<(), NetworkError> { - try!(self.host_channel.send(HostMessage::AddHandler { - handler: handler, + pub fn register_protocol(&mut self, handler: Box+Send>, protocol: ProtocolId, versions: &[u8]) -> Result<(), NetworkError> { + try!(self.io_service.send_message(NetworkIoMessage::AddHandler { + handler: Some(handler), protocol: protocol, versions: versions.to_vec(), })); Ok(()) } -} -impl Drop for NetworkService { - fn drop(&mut self) { - self.host_channel.send(HostMessage::Shutdown).unwrap(); - self.thread.take().unwrap().join().unwrap(); + /// Returns host identifier string as advertised to other peers + pub fn host_info(&self) -> String { + self.host_info.clone() } + + /// Returns underlying io service. + pub fn io(&mut self) -> &mut IoService> { + &mut self.io_service + } + + } diff --git a/src/network/session.rs b/src/network/session.rs index 96390d366..828e4b062 100644 --- a/src/network/session.rs +++ b/src/network/session.rs @@ -4,8 +4,9 @@ use rlp::*; use network::connection::{EncryptedConnection, Packet}; use network::handshake::Handshake; use error::*; -use network::{NetworkError, DisconnectReason}; +use network::error::{NetworkError, DisconnectReason}; use network::host::*; +use network::node::NodeId; /// Peer session over encrypted connection. /// When created waits for Hello packet exchange and signals ready state. @@ -83,7 +84,7 @@ const PACKET_LAST: u8 = 0x7f; impl Session { /// Create a new session out of comepleted handshake. Consumes handshake object. - pub fn new(h: Handshake, event_loop: &mut EventLoop, host: &HostInfo) -> Result { + pub fn new>(h: Handshake, event_loop: &mut EventLoop, host: &HostInfo) -> Result { let id = h.id.clone(); let connection = try!(EncryptedConnection::new(h)); let mut session = Session { @@ -108,7 +109,7 @@ impl Session { } /// Readable IO handler. Returns packet data if available. - pub fn readable(&mut self, event_loop: &mut EventLoop, host: &HostInfo) -> Result { + pub fn readable(&mut self, event_loop: &mut EventLoop, host: &HostInfo) -> Result { match try!(self.connection.readable(event_loop)) { Some(data) => Ok(try!(self.read_packet(data, host))), None => Ok(SessionData::None) @@ -116,7 +117,7 @@ impl Session { } /// Writable IO handler. Sends pending packets. - pub fn writable(&mut self, event_loop: &mut EventLoop, _host: &HostInfo) -> Result<(), UtilError> { + pub fn writable(&mut self, event_loop: &mut EventLoop, _host: &HostInfo) -> Result<(), UtilError> { self.connection.writable(event_loop) } @@ -126,7 +127,7 @@ impl Session { } /// Update registration with the event loop. Should be called at the end of the IO handler. - pub fn reregister(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { + pub fn reregister(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { self.connection.reregister(event_loop) } @@ -241,6 +242,7 @@ impl Session { i += 1; } trace!(target: "net", "Hello: {} v{} {} {:?}", client_version, protocol, id, caps); + self.info.client_version = client_version; self.info.capabilities = caps; if protocol != host.protocol_version { return Err(From::from(self.disconnect(DisconnectReason::UselessPeer)));