From c98f73c5c9953bb122322dc5a3b64a7021aa5c41 Mon Sep 17 00:00:00 2001 From: arkpar Date: Wed, 13 Jan 2016 11:31:37 +0100 Subject: [PATCH] Finished splitting IoService and NetworkService --- src/error.rs | 10 +- src/io/mod.rs | 23 +- src/io/service.rs | 58 ++--- src/network/connection.rs | 17 +- src/network/discovery.rs | 6 +- src/network/handshake.rs | 11 +- src/network/host.rs | 470 +++++++++++++++----------------------- src/network/mod.rs | 84 ++----- src/network/service.rs | 43 ++-- src/network/session.rs | 11 +- 10 files changed, 296 insertions(+), 437 deletions(-) diff --git a/src/error.rs b/src/error.rs index 17b65ceec..04f7b96ce 100644 --- a/src/error.rs +++ b/src/error.rs @@ -3,6 +3,7 @@ use rustc_serialize::hex::FromHexError; use network::NetworkError; use rlp::DecoderError; +use io; #[derive(Debug)] pub enum BaseDataError { @@ -13,7 +14,8 @@ pub enum BaseDataError { /// General error type which should be capable of representing all errors in ethcore. pub enum UtilError { Crypto(::crypto::CryptoError), - Io(::std::io::Error), + StdIo(::std::io::Error), + Io(io::IoError), AddressParse(::std::net::AddrParseError), AddressResolve(Option<::std::io::Error>), FromHex(FromHexError), @@ -43,6 +45,12 @@ impl From for UtilError { impl From<::std::io::Error> for UtilError { fn from(err: ::std::io::Error) -> UtilError { + UtilError::StdIo(err) + } +} + +impl From for UtilError { + fn from(err: io::IoError) -> UtilError { UtilError::Io(err) } } diff --git a/src/io/mod.rs b/src/io/mod.rs index e8562e64a..72906b6f4 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -51,18 +51,35 @@ pub enum IoError { Mio(::std::io::Error), } -impl From<::mio::NotifyError>> for IoError { +impl From<::mio::NotifyError>> for IoError where M: Send { fn from(_err: ::mio::NotifyError>) -> IoError { IoError::Mio(::std::io::Error::new(::std::io::ErrorKind::ConnectionAborted, "Network IO notification error")) } } +/// Generic IO handler. +/// All the handler function are called from within IO event loop. +/// `Message` type is used as notification data +pub trait IoHandler: Send where Message: Send + 'static { + /// Initialize the handler + fn initialize(&mut self, _io: IoContext) {} + /// Timer function called after a timeout created with `HandlerIo::timeout`. + fn timeout(&mut self, _io: IoContext, _timer: TimerToken) {} + /// Called when a broadcasted message is received. The message can only be sent from a different IO handler. + fn message(&mut self, _io: IoContext, _message: &mut Message) {} // TODO: make message immutable and provide internal channel for adding network handler + /// Called when an IO stream gets closed + fn stream_hup(&mut self, _io: IoContext, _stream: StreamToken) {} + /// Called when an IO stream can be read from + fn stream_readable(&mut self, _io: IoContext, _stream: StreamToken) {} + /// Called when an IO stream can be written to + fn stream_writable(&mut self, _io: IoContext, _stream: StreamToken) {} +} + pub type TimerToken = service::TimerToken; pub type StreamToken = service::StreamToken; pub type IoContext<'s, M> = service::IoContext<'s, M>; -pub type Message = service::UserMessage; pub type IoService = service::IoService; -pub type IoHandler = service::IoHandler; +pub const USER_TOKEN_START: usize = service::USER_TOKEN; diff --git a/src/io/service.rs b/src/io/service.rs index 4ecc34723..66814ba3c 100644 --- a/src/io/service.rs +++ b/src/io/service.rs @@ -4,24 +4,7 @@ use mio::util::{Slab}; use hash::*; use rlp::*; use error::*; -use io::IoError; - -/// Generic IO handler. -/// All the handler function are called from within IO event loop. -pub trait IoHandler: Send where M: Send + 'static { - /// Initialize the hadler - fn initialize(&mut self, _io: &mut IoContext) {} - /// Timer function called after a timeout created with `HandlerIo::timeout`. - fn timeout(&mut self, _io: &mut IoContext, _timer: TimerToken) {} - /// Called when a broadcasted message is received. The message can only be sent from a different IO handler. - fn message(&mut self, _io: &mut IoContext, _message: &M) {} - /// Called when an IO stream gets closed - fn stream_hup(&mut self, _io: &mut IoContext, _stream: StreamToken) {} - /// Called when an IO stream can be read from - fn stream_readable(&mut self, _io: &mut IoContext, _stream: StreamToken) {} - /// Called when an IO stream can be written to - fn stream_writable(&mut self, _io: &mut IoContext, _stream: StreamToken) {} -} +use io::{IoError, IoHandler}; pub type TimerToken = usize; pub type StreamToken = usize; @@ -30,9 +13,10 @@ pub type StreamToken = usize; const MAX_USER_TIMERS: usize = 32; const USER_TIMER: usize = 0; const LAST_USER_TIMER: usize = USER_TIMER + MAX_USER_TIMERS - 1; +pub const USER_TOKEN: usize = LAST_USER_TIMER + 1; /// Messages used to communicate with the event loop from other threads. -pub enum IoMessage { +pub enum IoMessage where M: Send + Sized { /// Shutdown the event loop Shutdown, /// Register a new protocol handler. @@ -40,18 +24,13 @@ pub enum IoMessage { handler: Box+Send>, }, /// Broadcast a message across all protocol handlers. - UserMessage(UserMessage), -} - -/// User -pub struct UserMessage { - pub data: M, + UserMessage(M) } /// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem. pub struct IoContext<'s, M> where M: Send + 'static { timers: &'s mut Slab, - event_loop: &'s mut EventLoop>, + pub event_loop: &'s mut EventLoop>, } impl<'s, M> IoContext<'s, M> where M: Send + 'static { @@ -78,9 +57,7 @@ impl<'s, M> IoContext<'s, M> where M: Send + 'static { /// Broadcast a message to other IO clients pub fn message(&mut self, message: M) { - match self.event_loop.channel().send(IoMessage::UserMessage(UserMessage { - data: message - })) { + match self.event_loop.channel().send(IoMessage::UserMessage(message)) { Ok(_) => {} Err(e) => { panic!("Error sending io message {:?}", e); } } @@ -116,17 +93,17 @@ impl Handler for IoManager where M: Send + 'static { fn ready(&mut self, event_loop: &mut EventLoop, token: Token, events: EventSet) { if events.is_hup() { for h in self.handlers.iter_mut() { - h.stream_hup(&mut IoContext::new(event_loop, &mut self.timers), token.as_usize()); + h.stream_hup(IoContext::new(event_loop, &mut self.timers), token.as_usize()); } } else if events.is_readable() { for h in self.handlers.iter_mut() { - h.stream_readable(&mut IoContext::new(event_loop, &mut self.timers), token.as_usize()); + h.stream_readable(IoContext::new(event_loop, &mut self.timers), token.as_usize()); } } else if events.is_writable() { for h in self.handlers.iter_mut() { - h.stream_writable(&mut IoContext::new(event_loop, &mut self.timers), token.as_usize()); + h.stream_writable(IoContext::new(event_loop, &mut self.timers), token.as_usize()); } } } @@ -139,29 +116,30 @@ impl Handler for IoManager where M: Send + 'static { timer.delay }; for h in self.handlers.iter_mut() { - h.timeout(&mut IoContext::new(event_loop, &mut self.timers), token.as_usize()); + h.timeout(IoContext::new(event_loop, &mut self.timers), token.as_usize()); } event_loop.timeout_ms(token, delay).expect("Error re-registering user timer"); } _ => { // Just pass the event down. IoHandler is supposed to re-register it if required. for h in self.handlers.iter_mut() { - h.timeout(&mut IoContext::new(event_loop, &mut self.timers), token.as_usize()); + h.timeout(IoContext::new(event_loop, &mut self.timers), token.as_usize()); } } } } fn notify(&mut self, event_loop: &mut EventLoop, msg: Self::Message) { - match msg { + let mut m = msg; + match m { IoMessage::Shutdown => event_loop.shutdown(), IoMessage::AddHandler { handler, } => { self.handlers.push(handler); }, - IoMessage::UserMessage(message) => { + IoMessage::UserMessage(ref mut data) => { for h in self.handlers.iter_mut() { - h.message(&mut IoContext::new(event_loop, &mut self.timers), &message.data); + h.message(IoContext::new(event_loop, &mut self.timers), data); } } } @@ -196,6 +174,12 @@ impl IoService where M: Send + 'static { })); Ok(()) } + + /// Send a message over the network. Normaly `HostIo::send` should be used. This can be used from non-io threads. + pub fn send_message(&mut self, message: M) -> Result<(), IoError> { + try!(self.host_channel.send(IoMessage::UserMessage(message))); + Ok(()) + } } impl Drop for IoService where M: Send { diff --git a/src/network/connection.rs b/src/network/connection.rs index 2c671a08a..f11c10384 100644 --- a/src/network/connection.rs +++ b/src/network/connection.rs @@ -1,14 +1,13 @@ use std::collections::VecDeque; -use mio::{Token, EventSet, EventLoop, Timeout, PollOpt, TryRead, TryWrite}; +use mio::{Handler, Token, EventSet, EventLoop, Timeout, PollOpt, TryRead, TryWrite}; use mio::tcp::*; use hash::*; use sha3::*; use bytes::*; use rlp::*; use std::io::{self, Cursor, Read}; -use network::host::{Host}; use error::*; -use network::NetworkError; +use network::error::NetworkError; use network::handshake::Handshake; use crypto; use rcrypto::blockmodes::*; @@ -133,7 +132,7 @@ impl Connection { } /// Register this connection with the IO event loop. - pub fn register(&mut self, event_loop: &mut EventLoop) -> io::Result<()> { + pub fn register(&mut self, event_loop: &mut EventLoop) -> io::Result<()> { trace!(target: "net", "connection register; token={:?}", self.token); self.interest.insert(EventSet::readable()); event_loop.register(&self.socket, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| { @@ -143,7 +142,7 @@ impl Connection { } /// Update connection registration. Should be called at the end of the IO handler. - pub fn reregister(&mut self, event_loop: &mut EventLoop) -> io::Result<()> { + pub fn reregister(&mut self, event_loop: &mut EventLoop) -> io::Result<()> { trace!(target: "net", "connection reregister; token={:?}", self.token); event_loop.reregister( &self.socket, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| { error!("Failed to reregister {:?}, {:?}", self.token, e); @@ -338,7 +337,7 @@ impl EncryptedConnection { } /// Readable IO handler. Tracker receive status and returns decoded packet if avaialable. - pub fn readable(&mut self, event_loop: &mut EventLoop) -> Result, UtilError> { + pub fn readable(&mut self, event_loop: &mut EventLoop) -> Result, UtilError> { self.idle_timeout.map(|t| event_loop.clear_timeout(t)); match self.read_state { EncryptedConnectionState::Header => { @@ -364,14 +363,14 @@ impl EncryptedConnection { } /// Writable IO handler. Processes send queeue. - pub fn writable(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { + pub fn writable(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { self.idle_timeout.map(|t| event_loop.clear_timeout(t)); try!(self.connection.writable()); Ok(()) } /// Register this connection with the event handler. - pub fn register(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { + pub fn register>(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { self.connection.expect(ENCRYPTED_HEADER_LEN); self.idle_timeout.map(|t| event_loop.clear_timeout(t)); self.idle_timeout = event_loop.timeout_ms(self.connection.token, 1800).ok(); @@ -380,7 +379,7 @@ impl EncryptedConnection { } /// Update connection registration. This should be called at the end of the event loop. - pub fn reregister(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { + pub fn reregister(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { try!(self.connection.reregister(event_loop)); Ok(()) } diff --git a/src/network/discovery.rs b/src/network/discovery.rs index f2d139f45..d2a4f21a2 100644 --- a/src/network/discovery.rs +++ b/src/network/discovery.rs @@ -10,7 +10,7 @@ use mio::udp::*; use hash::*; use sha3::Hashable; use crypto::*; -use network::host::*; +use network::node::*; const ADDRESS_BYTES_SIZE: u32 = 32; ///< Size of address type in bytes. const ADDRESS_BITS: u32 = 8 * ADDRESS_BYTES_SIZE; ///< Denoted by n in [Kademlia]. @@ -70,14 +70,14 @@ impl Discovery { self.node_buckets[Discovery::distance(&self.id, &id) as usize].nodes.push(id.clone()); } - fn start_node_discovery(&mut self, event_loop: &mut EventLoop) { + fn start_node_discovery(&mut self, event_loop: &mut EventLoop) { self.discovery_round = 0; self.discovery_id.randomize(); self.discovery_nodes.clear(); self.discover(event_loop); } - fn discover(&mut self, event_loop: &mut EventLoop) { + fn discover(&mut self, event_loop: &mut EventLoop) { if self.discovery_round == DISCOVERY_MAX_STEPS { debug!("Restarting discovery"); diff --git a/src/network/handshake.rs b/src/network/handshake.rs index af67f7ddf..ca95808b4 100644 --- a/src/network/handshake.rs +++ b/src/network/handshake.rs @@ -6,9 +6,10 @@ use bytes::Bytes; use crypto::*; use crypto; use network::connection::{Connection}; -use network::host::{NodeId, Host, HostInfo}; +use network::host::{HostInfo}; +use network::node::NodeId; use error::*; -use network::NetworkError; +use network::error::NetworkError; #[derive(PartialEq, Eq, Debug)] enum HandshakeState { @@ -88,7 +89,7 @@ impl Handshake { } /// Readable IO handler. Drives the state change. - pub fn readable(&mut self, event_loop: &mut EventLoop, host: &HostInfo) -> Result<(), UtilError> { + pub fn readable(&mut self, event_loop: &mut EventLoop, host: &HostInfo) -> Result<(), UtilError> { self.idle_timeout.map(|t| event_loop.clear_timeout(t)); match self.state { HandshakeState::ReadingAuth => { @@ -118,7 +119,7 @@ impl Handshake { } /// Writabe IO handler. - pub fn writable(&mut self, event_loop: &mut EventLoop, _host: &HostInfo) -> Result<(), UtilError> { + pub fn writable(&mut self, event_loop: &mut EventLoop, _host: &HostInfo) -> Result<(), UtilError> { self.idle_timeout.map(|t| event_loop.clear_timeout(t)); try!(self.connection.writable()); if self.state != HandshakeState::StartSession { @@ -128,7 +129,7 @@ impl Handshake { } /// Register the IO handler with the event loop - pub fn register(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { + pub fn register>(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { self.idle_timeout.map(|t| event_loop.clear_timeout(t)); self.idle_timeout = event_loop.timeout_ms(self.connection.token, 1800).ok(); try!(self.connection.register(event_loop)); diff --git a/src/network/host.rs b/src/network/host.rs index 11dc491b2..d61ef614c 100644 --- a/src/network/host.rs +++ b/src/network/host.rs @@ -1,6 +1,7 @@ -use std::net::{SocketAddr, ToSocketAddrs}; +use std::mem; +use std::net::{SocketAddr }; use std::collections::{HashMap}; -use std::hash::{Hash, Hasher}; +use std::hash::{Hasher}; use std::str::{FromStr}; use mio::*; use mio::util::{Slab}; @@ -10,23 +11,18 @@ use hash::*; use crypto::*; use sha3::Hashable; use rlp::*; -use time::Tm; use network::handshake::Handshake; use network::session::{Session, SessionData}; use error::*; -use network::ProtocolHandler; +use io::*; +use network::NetworkProtocolHandler; +use network::node::*; const _DEFAULT_PORT: u16 = 30304; const MAX_CONNECTIONS: usize = 1024; -const MAX_USER_TIMERS: usize = 32; const IDEAL_PEERS: u32 = 10; -/// Node public key -pub type NodeId = H512; -/// IO Timer id -pub type TimerToken = usize; - #[derive(Debug)] struct NetworkConfiguration { listen_address: SocketAddr, @@ -48,88 +44,15 @@ impl NetworkConfiguration { } } -#[derive(Debug)] -/// Noe address info -pub struct NodeEndpoint { - /// IP(V4 or V6) address - address: SocketAddr, - /// Address as string (can be host name). - address_str: String, - /// Conneciton port. - udp_port: u16 -} - -impl NodeEndpoint { - /// Create endpoint from string. Performs name resolution if given a host name. - fn from_str(s: &str) -> Result { - let address = s.to_socket_addrs().map(|mut i| i.next()); - match address { - Ok(Some(a)) => Ok(NodeEndpoint { - address: a, - address_str: s.to_string(), - udp_port: a.port() - }), - Ok(_) => Err(UtilError::AddressResolve(None)), - Err(e) => Err(UtilError::AddressResolve(Some(e))) - } - } -} - -#[derive(PartialEq, Eq, Copy, Clone)] -enum PeerType { - Required, - Optional -} - -struct Node { - id: NodeId, - endpoint: NodeEndpoint, - peer_type: PeerType, - last_attempted: Option, -} - -impl FromStr for Node { - type Err = UtilError; - fn from_str(s: &str) -> Result { - let (id, endpoint) = if &s[0..8] == "enode://" && s.len() > 136 && &s[136..137] == "@" { - (try!(NodeId::from_str(&s[8..136])), try!(NodeEndpoint::from_str(&s[137..]))) - } - else { - (NodeId::new(), try!(NodeEndpoint::from_str(s))) - }; - - Ok(Node { - id: id, - endpoint: endpoint, - peer_type: PeerType::Optional, - last_attempted: None, - }) - } -} - -impl PartialEq for Node { - fn eq(&self, other: &Self) -> bool { - self.id == other.id - } -} -impl Eq for Node { } - -impl Hash for Node { - fn hash(&self, state: &mut H) where H: Hasher { - self.id.hash(state) - } -} - // Tokens -const TCP_ACCEPT: usize = 1; -const IDLE: usize = 3; -const NODETABLE_RECEIVE: usize = 4; -const NODETABLE_MAINTAIN: usize = 5; -const NODETABLE_DISCOVERY: usize = 6; -const FIRST_CONNECTION: usize = 7; +const TOKEN_BEGIN: usize = USER_TOKEN_START; +const TCP_ACCEPT: usize = TOKEN_BEGIN; +const IDLE: usize = TOKEN_BEGIN + 1; +const NODETABLE_RECEIVE: usize = TOKEN_BEGIN + 2; +const NODETABLE_MAINTAIN: usize = TOKEN_BEGIN + 3; +const NODETABLE_DISCOVERY: usize = TOKEN_BEGIN + 4; +const FIRST_CONNECTION: usize = TOKEN_BEGIN + 16; const LAST_CONNECTION: usize = FIRST_CONNECTION + MAX_CONNECTIONS - 1; -const USER_TIMER: usize = LAST_CONNECTION; -const LAST_USER_TIMER: usize = USER_TIMER + MAX_USER_TIMERS - 1; /// Protocol handler level packet id pub type PacketId = u8; @@ -137,12 +60,10 @@ pub type PacketId = u8; pub type ProtocolId = &'static str; /// Messages used to communitate with the event loop from other threads. -pub enum HostMessage { - /// Shutdown the event loop - Shutdown, +pub enum NetworkIoMessage where Message: Send { /// Register a new protocol handler. AddHandler { - handler: Box, + handler: Option+Send>>, protocol: ProtocolId, versions: Vec, }, @@ -153,19 +74,11 @@ pub enum HostMessage { protocol: ProtocolId, data: Vec, }, - /// Broadcast a message across the protocol handlers. - UserMessage(UserMessage), -} - -/// Id for broadcast message -pub type UserMessageId = u32; - -/// User -pub struct UserMessage { - /// ID of a protocol - pub protocol: ProtocolId, - pub id: UserMessageId, - pub data: Option>, + /// User message + User { + protocol: ProtocolId, + message: Message, + }, } /// Local (temporary) peer session ID. @@ -190,31 +103,38 @@ impl Encodable for CapabilityInfo { } /// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem. -pub struct HostIo<'s> { - protocol: ProtocolId, +pub struct NetworkContext<'s, Message> where Message: Send + 'static { + io: IoContext<'s, NetworkIoMessage>, + protocol: Option, connections: &'s mut Slab, - timers: &'s mut Slab, - session: Option, - event_loop: &'s mut EventLoop, + timers: &'s mut HashMap, + session: Option, } -impl<'s> HostIo<'s> { - /// Create a new IO access point. Takes references to all the data that can be updated within the IO handler. - fn new(protocol: ProtocolId, session: Option, event_loop: &'s mut EventLoop, connections: &'s mut Slab, timers: &'s mut Slab) -> HostIo<'s> { - HostIo { +impl<'s, Message> NetworkContext<'s, Message> where Message: Send + 'static, { + /// Create a new network IO access point. Takes references to all the data that can be updated within the IO handler. + fn new(io: IoContext<'s, NetworkIoMessage>, + protocol: Option, + session: Option, connections: &'s mut Slab, + timers: &'s mut HashMap) -> NetworkContext<'s, Message> { + NetworkContext { + io: io, protocol: protocol, session: session, - event_loop: event_loop, connections: connections, timers: timers, } } + fn set_protocol(&mut self, protocol: ProtocolId) { + self.protocol = Some(protocol); + } + /// Send a packet over the network to another peer. pub fn send(&mut self, peer: PeerId, packet_id: PacketId, data: Vec) -> Result<(), UtilError> { match self.connections.get_mut(Token(peer)) { Some(&mut ConnectionEntry::Session(ref mut s)) => { - s.send_packet(self.protocol, packet_id as u8, &data).unwrap_or_else(|e| { + s.send_packet(self.protocol.unwrap(), packet_id as u8, &data).unwrap_or_else(|e| { warn!(target: "net", "Send error: {:?}", e); }); //TODO: don't copy vector data }, @@ -228,49 +148,28 @@ impl<'s> HostIo<'s> { /// Respond to a current network message. Panics if no there is no packet in the context. pub fn respond(&mut self, packet_id: PacketId, data: Vec) -> Result<(), UtilError> { match self.session { - Some(session) => self.send(session.as_usize(), packet_id, data), + Some(session) => self.send(session, packet_id, data), None => { panic!("Respond: Session does not exist") } } } - /// Register a new IO timer. Returns a new timer toke. 'ProtocolHandler::timeout' will be called with the token. - pub fn register_timer(&mut self, ms: u64) -> Result{ - match self.timers.insert(UserTimer { - delay: ms, - protocol: self.protocol, - }) { - Ok(token) => { - self.event_loop.timeout_ms(token, ms).expect("Error registering user timer"); - Ok(token.as_usize()) - }, - _ => { panic!("Max timers reached") } - } - } - - /// Broadcast a message to other IO clients - pub fn message(&mut self, id: UserMessageId, data: Option>) { - match self.event_loop.channel().send(HostMessage::UserMessage(UserMessage { - protocol: self.protocol, - id: id, - data: data - })) { - Ok(_) => {} - Err(e) => { panic!("Error sending io message {:?}", e); } - } - } - /// Disable current protocol capability for given peer. If no capabilities left peer gets disconnected. pub fn disable_peer(&mut self, _peer: PeerId) { //TODO: remove capability, disconnect if no capabilities left } -} - -struct UserTimer { - protocol: ProtocolId, - delay: u64, + /// Register a new IO timer. Returns a new timer token. 'NetworkProtocolHandler::timeout' will be called with the token. + pub fn register_timer(&mut self, ms: u64) -> Result{ + match self.io.register_timer(ms) { + Ok(token) => { + self.timers.insert(token, self.protocol.unwrap()); + Ok(token) + }, + e @ Err(_) => e, + } + } } /// Shared host information @@ -315,67 +214,40 @@ enum ConnectionEntry { } /// Root IO handler. Manages protocol handlers, IO timers and network connections. -pub struct Host { +pub struct Host where Message: Send { info: HostInfo, - _udp_socket: UdpSocket, - _listener: TcpListener, + udp_socket: UdpSocket, + listener: TcpListener, connections: Slab, - timers: Slab, + timers: HashMap, nodes: HashMap, - handlers: HashMap>, - _idle_timeout: Timeout, + handlers: HashMap>>, } -impl Host { - /// Creates a new instance and registers it with the event loop. - pub fn start(event_loop: &mut EventLoop) -> Result<(), UtilError> { +impl Host where Message: Send { + pub fn new() -> Host { let config = NetworkConfiguration::new(); - /* - match ::ifaces::Interface::get_all().unwrap().into_iter().filter(|x| x.kind == ::ifaces::Kind::Packet && x.addr.is_some()).next() { - Some(iface) => config.public_address = iface.addr.unwrap(), - None => warn!("No public network interface"), - */ - let addr = config.listen_address; // Setup the server socket let listener = TcpListener::bind(&addr).unwrap(); - // Start listening for incoming connections - event_loop.register(&listener, Token(TCP_ACCEPT), EventSet::readable(), PollOpt::edge()).unwrap(); - let idle_timeout = event_loop.timeout_ms(Token(IDLE), 1000).unwrap(); //TODO: check delay - // open the udp socket let udp_socket = UdpSocket::bound(&addr).unwrap(); - event_loop.register(&udp_socket, Token(NODETABLE_RECEIVE), EventSet::readable(), PollOpt::edge()).unwrap(); - event_loop.timeout_ms(Token(NODETABLE_MAINTAIN), 7200).unwrap(); - let port = config.listen_address.port(); - - let mut host = Host { + Host:: { info: HostInfo { keys: KeyPair::create().unwrap(), config: config, nonce: H256::random(), protocol_version: 4, client_version: "parity".to_string(), - listen_port: port, + listen_port: 0, capabilities: Vec::new(), }, - _udp_socket: udp_socket, - _listener: listener, + udp_socket: udp_socket, + listener: listener, connections: Slab::new_starting_at(Token(FIRST_CONNECTION), MAX_CONNECTIONS), - timers: Slab::new_starting_at(Token(USER_TIMER), MAX_USER_TIMERS), + timers: HashMap::new(), nodes: HashMap::new(), handlers: HashMap::new(), - _idle_timeout: idle_timeout, - }; - - host.add_node("enode://c022e7a27affdd1632f2e67dffeb87f02bf506344bb142e08d12b28e7e5c6e5dbb8183a46a77bff3631b51c12e8cf15199f797feafdc8834aaf078ad1a2bcfa0@127.0.0.1:30303"); - host.add_node("enode://5374c1bff8df923d3706357eeb4983cd29a63be40a269aaa2296ee5f3b2119a8978c0ed68b8f6fc84aad0df18790417daadf91a4bfbb786a16c9b0a199fa254a@gav.ethdev.com:30300"); - host.add_node("enode://e58d5e26b3b630496ec640f2530f3e7fa8a8c7dfe79d9e9c4aac80e3730132b869c852d3125204ab35bb1b1951f6f2d40996c1034fd8c5a69b383ee337f02ddc@gav.ethdev.com:30303"); - host.add_node("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@52.16.188.185:30303"); - host.add_node("enode://7f25d3eab333a6b98a8b5ed68d962bb22c876ffcd5561fca54e3c2ef27f754df6f7fd7c9b74cc919067abac154fb8e1f8385505954f161ae440abc355855e034@54.207.93.166:30303"); - host.add_node("enode://5374c1bff8df923d3706357eeb4983cd29a63be40a269aaa2296ee5f3b2119a8978c0ed68b8f6fc84aad0df18790417daadf91a4bfbb786a16c9b0a199fa254a@92.51.165.126:30303"); - - try!(event_loop.run(&mut host)); - Ok(()) + } } fn add_node(&mut self, id: &str) { @@ -387,8 +259,8 @@ impl Host { } } - fn maintain_network(&mut self, event_loop: &mut EventLoop) { - self.connect_peers(event_loop); + fn maintain_network(&mut self, io: IoContext>) { + self.connect_peers(io); } fn have_session(&self, id: &NodeId) -> bool { @@ -399,8 +271,7 @@ impl Host { self.connections.iter().any(|e| match e { &ConnectionEntry::Handshake(ref h) => h.id.eq(&id), _ => false }) } - fn connect_peers(&mut self, event_loop: &mut EventLoop) { - + fn connect_peers(&mut self, mut io: IoContext>) { struct NodeInfo { id: NodeId, peer_type: PeerType @@ -425,7 +296,7 @@ impl Host { for n in to_connect.iter() { if n.peer_type == PeerType::Required { if req_conn < IDEAL_PEERS { - self.connect_peer(&n.id, event_loop); + self.connect_peer(&n.id, &mut io); } req_conn += 1; } @@ -440,14 +311,14 @@ impl Host { for n in to_connect.iter() { if n.peer_type == PeerType::Optional && open_slots > 0 { open_slots -= 1; - self.connect_peer(&n.id, event_loop); + self.connect_peer(&n.id, &mut io); } } } } } - fn connect_peer(&mut self, id: &NodeId, event_loop: &mut EventLoop) { + fn connect_peer(&mut self, id: &NodeId, io: &mut IoContext>) { if self.have_session(id) { warn!("Aborted connect. Node already connected."); @@ -478,7 +349,7 @@ impl Host { match self.connections.get_mut(token) { Some(&mut ConnectionEntry::Handshake(ref mut h)) => { h.start(&self.info, true) - .and_then(|_| h.register(event_loop)) + .and_then(|_| h.register(io.event_loop)) .unwrap_or_else (|e| { debug!(target: "net", "Handshake create error: {:?}", e); }); @@ -491,23 +362,23 @@ impl Host { } - fn accept(&mut self, _event_loop: &mut EventLoop) { + fn accept(&mut self, _io: IoContext>) { trace!(target: "net", "accept"); } - fn connection_writable(&mut self, token: Token, event_loop: &mut EventLoop) { + fn connection_writable<'s>(&'s mut self, token: StreamToken, io: IoContext<'s, NetworkIoMessage>) { let mut kill = false; let mut create_session = false; - match self.connections.get_mut(token) { + match self.connections.get_mut(Token(token)) { Some(&mut ConnectionEntry::Handshake(ref mut h)) => { - h.writable(event_loop, &self.info).unwrap_or_else(|e| { + h.writable(io.event_loop, &self.info).unwrap_or_else(|e| { debug!(target: "net", "Handshake write error: {:?}", e); kill = true; }); create_session = h.done(); }, Some(&mut ConnectionEntry::Session(ref mut s)) => { - s.writable(event_loop, &self.info).unwrap_or_else(|e| { + s.writable(io.event_loop, &self.info).unwrap_or_else(|e| { debug!(target: "net", "Session write error: {:?}", e); kill = true; }); @@ -516,39 +387,41 @@ impl Host { warn!(target: "net", "Received event for unknown connection"); } } + let mut net_context = NetworkContext::new(io, None, Some(token), &mut self.connections, &mut self.timers); if kill { - self.kill_connection(token, event_loop); + Host::kill_connection(token, &mut net_context, &mut self.handlers); } if create_session { - self.start_session(token, event_loop); + Host::start_session(&self.info, token, &mut net_context); } - match self.connections.get_mut(token) { + match net_context.connections.get_mut(Token(token)) { Some(&mut ConnectionEntry::Session(ref mut s)) => { - s.reregister(event_loop).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e)); + s.reregister(net_context.io.event_loop).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e)); }, _ => (), } } - fn connection_closed(&mut self, token: Token, event_loop: &mut EventLoop) { - self.kill_connection(token, event_loop); + fn connection_closed<'s>(&'s mut self, token: TimerToken, io: IoContext<'s, NetworkIoMessage>) { + let mut net_context = NetworkContext::new(io, None, Some(token), &mut self.connections, &mut self.timers); + Host::kill_connection(token, &mut net_context, &mut self.handlers); } - fn connection_readable(&mut self, token: Token, event_loop: &mut EventLoop) { + fn connection_readable<'s>(&'s mut self, token: StreamToken, io: IoContext<'s, NetworkIoMessage>) { let mut kill = false; let mut create_session = false; let mut ready_data: Vec = Vec::new(); let mut packet_data: Option<(ProtocolId, PacketId, Vec)> = None; - match self.connections.get_mut(token) { + match self.connections.get_mut(Token(token)) { Some(&mut ConnectionEntry::Handshake(ref mut h)) => { - h.readable(event_loop, &self.info).unwrap_or_else(|e| { + h.readable(io.event_loop, &self.info).unwrap_or_else(|e| { debug!(target: "net", "Handshake read error: {:?}", e); kill = true; }); create_session = h.done(); }, Some(&mut ConnectionEntry::Session(ref mut s)) => { - let sd = { s.readable(event_loop, &self.info).unwrap_or_else(|e| { + let sd = { s.readable(io.event_loop, &self.info).unwrap_or_else(|e| { debug!(target: "net", "Session read error: {:?}", e); kill = true; SessionData::None @@ -578,32 +451,35 @@ impl Host { warn!(target: "net", "Received event for unknown connection"); } } + let mut net_context = NetworkContext::new(io, None, Some(token), &mut self.connections, &mut self.timers); if kill { - self.kill_connection(token, event_loop); + Host::kill_connection(token, &mut net_context, &mut self.handlers); } if create_session { - self.start_session(token, event_loop); + Host::start_session(&self.info, token, &mut net_context); } for p in ready_data { let mut h = self.handlers.get_mut(p).unwrap(); - h.connected(&mut HostIo::new(p, Some(token), event_loop, &mut self.connections, &mut self.timers), &token.as_usize()); + net_context.set_protocol(p); + h.connected(&mut net_context, &token); } if let Some((p, packet_id, data)) = packet_data { let mut h = self.handlers.get_mut(p).unwrap(); - h.read(&mut HostIo::new(p, Some(token), event_loop, &mut self.connections, &mut self.timers), &token.as_usize(), packet_id, &data[1..]); + net_context.set_protocol(p); + h.read(&mut net_context, &token, packet_id, &data[1..]); } - match self.connections.get_mut(token) { + match net_context.connections.get_mut(Token(token)) { Some(&mut ConnectionEntry::Session(ref mut s)) => { - s.reregister(event_loop).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e)); + s.reregister(net_context.io.event_loop).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e)); }, _ => (), } } - fn start_session(&mut self, token: Token, event_loop: &mut EventLoop) { - let info = &self.info; - self.connections.replace_with(token, |c| { + fn start_session(info: &HostInfo, token: StreamToken, io: &mut NetworkContext) { + let event_loop = &mut io.io.event_loop; + io.connections.replace_with(Token(token), |c| { match c { ConnectionEntry::Handshake(h) => Session::new(h, event_loop, info) .map(|s| Some(ConnectionEntry::Session(s))) @@ -616,16 +492,17 @@ impl Host { }).expect("Error updating slab with session"); } - fn connection_timeout(&mut self, token: Token, event_loop: &mut EventLoop) { - self.kill_connection(token, event_loop) + fn connection_timeout<'s>(&'s mut self, token: StreamToken, io: IoContext<'s, NetworkIoMessage>) { + let mut net_context = NetworkContext::new(io, None, Some(token), &mut self.connections, &mut self.timers); + Host::kill_connection(token, &mut net_context, &mut self.handlers) } - fn kill_connection(&mut self, token: Token, event_loop: &mut EventLoop) { + fn kill_connection(token: StreamToken, io: &mut NetworkContext, handlers: &mut HashMap>>) { let mut to_disconnect: Vec = Vec::new(); - match self.connections.get_mut(token) { + match io.connections.get_mut(Token(token)) { Some(&mut ConnectionEntry::Handshake(_)) => (), // just abandon handshake Some(&mut ConnectionEntry::Session(ref mut s)) if s.is_ready() => { - for (p, _) in self.handlers.iter_mut() { + for (p, _) in handlers.iter_mut() { if s.have_capability(p) { to_disconnect.push(p); } @@ -634,87 +511,103 @@ impl Host { _ => (), } for p in to_disconnect { - let mut h = self.handlers.get_mut(p).unwrap(); - h.disconnected(&mut HostIo::new(p, Some(token), event_loop, &mut self.connections, &mut self.timers), &token.as_usize()); + let mut h = handlers.get_mut(p).unwrap(); + io.set_protocol(p); + h.disconnected(io, &token); } - self.connections.remove(token); + io.connections.remove(Token(token)); } } -impl Handler for Host { - type Timeout = Token; - type Message = HostMessage; +impl IoHandler> for Host where Message: Send + 'static { + /// Initialize networking + fn initialize(&mut self, io: IoContext>) { + /* + match ::ifaces::Interface::get_all().unwrap().into_iter().filter(|x| x.kind == ::ifaces::Kind::Packet && x.addr.is_some()).next() { + Some(iface) => config.public_address = iface.addr.unwrap(), + None => warn!("No public network interface"), + */ - fn ready(&mut self, event_loop: &mut EventLoop, token: Token, events: EventSet) { - if events.is_hup() { - trace!(target: "net", "hup"); - match token.as_usize() { - FIRST_CONNECTION ... LAST_CONNECTION => self.connection_closed(token, event_loop), - _ => warn!(target: "net", "Unexpected hup"), - }; - } - else if events.is_readable() { - match token.as_usize() { - TCP_ACCEPT => self.accept(event_loop), - IDLE => self.maintain_network(event_loop), - FIRST_CONNECTION ... LAST_CONNECTION => self.connection_readable(token, event_loop), - NODETABLE_RECEIVE => {}, - _ => panic!("Received unknown readable token"), - } - } - else if events.is_writable() { - match token.as_usize() { - FIRST_CONNECTION ... LAST_CONNECTION => self.connection_writable(token, event_loop), - _ => panic!("Received unknown writable token"), - } + // Start listening for incoming connections + io.event_loop.register(&self.listener, Token(TCP_ACCEPT), EventSet::readable(), PollOpt::edge()).unwrap(); + io.event_loop.timeout_ms(Token(IDLE), 1000).unwrap(); //TODO: check delay + // open the udp socket + io.event_loop.register(&self.udp_socket, Token(NODETABLE_RECEIVE), EventSet::readable(), PollOpt::edge()).unwrap(); + io.event_loop.timeout_ms(Token(NODETABLE_MAINTAIN), 7200).unwrap(); + let port = self.info.config.listen_address.port(); + self.info.listen_port = port; + + self.add_node("enode://c022e7a27affdd1632f2e67dffeb87f02bf506344bb142e08d12b28e7e5c6e5dbb8183a46a77bff3631b51c12e8cf15199f797feafdc8834aaf078ad1a2bcfa0@127.0.0.1:30303"); + self.add_node("enode://5374c1bff8df923d3706357eeb4983cd29a63be40a269aaa2296ee5f3b2119a8978c0ed68b8f6fc84aad0df18790417daadf91a4bfbb786a16c9b0a199fa254a@gav.ethdev.com:30300"); + self.add_node("enode://e58d5e26b3b630496ec640f2530f3e7fa8a8c7dfe79d9e9c4aac80e3730132b869c852d3125204ab35bb1b1951f6f2d40996c1034fd8c5a69b383ee337f02ddc@gav.ethdev.com:30303"); + self.add_node("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@52.16.188.185:30303"); + self.add_node("enode://7f25d3eab333a6b98a8b5ed68d962bb22c876ffcd5561fca54e3c2ef27f754df6f7fd7c9b74cc919067abac154fb8e1f8385505954f161ae440abc355855e034@54.207.93.166:30303"); + self.add_node("enode://5374c1bff8df923d3706357eeb4983cd29a63be40a269aaa2296ee5f3b2119a8978c0ed68b8f6fc84aad0df18790417daadf91a4bfbb786a16c9b0a199fa254a@92.51.165.126:30303"); + } + + fn stream_hup(&mut self, io: IoContext>, stream: StreamToken) { + trace!(target: "net", "Hup: {}", stream); + match stream { + FIRST_CONNECTION ... LAST_CONNECTION => self.connection_closed(stream, io), + _ => warn!(target: "net", "Unexpected hup"), + }; + } + + fn stream_readable(&mut self, io: IoContext>, stream: StreamToken) { + match stream { + TCP_ACCEPT => self.accept(io), + FIRST_CONNECTION ... LAST_CONNECTION => self.connection_readable(stream, io), + NODETABLE_RECEIVE => {}, + _ => panic!("Received unknown readable token"), } } - fn timeout(&mut self, event_loop: &mut EventLoop, token: Token) { - match token.as_usize() { - IDLE => self.maintain_network(event_loop), - FIRST_CONNECTION ... LAST_CONNECTION => self.connection_timeout(token, event_loop), + fn stream_writable(&mut self, io: IoContext>, stream: StreamToken) { + match stream { + FIRST_CONNECTION ... LAST_CONNECTION => self.connection_writable(stream, io), + _ => panic!("Received unknown writable token"), + } + } + + fn timeout(&mut self, io: IoContext>, token: TimerToken) { + match token { + IDLE => self.maintain_network(io), + FIRST_CONNECTION ... LAST_CONNECTION => self.connection_timeout(token, io), NODETABLE_DISCOVERY => {}, NODETABLE_MAINTAIN => {}, - USER_TIMER ... LAST_USER_TIMER => { - let (protocol, delay) = { - let timer = self.timers.get_mut(token).expect("Unknown user timer token"); - (timer.protocol, timer.delay) - }; + _ => { + let protocol: ProtocolId = self.timers.get_mut(&token).expect("Unknown user timer token"); match self.handlers.get_mut(protocol) { None => { warn!(target: "net", "No handler found for protocol: {:?}", protocol) }, Some(h) => { - h.timeout(&mut HostIo::new(protocol, None, event_loop, &mut self.connections, &mut self.timers), token.as_usize()); - event_loop.timeout_ms(token, delay).expect("Error re-registering user timer"); + h.timeout(&mut NetworkContext::new(io, Some(protocol), Some(token), &mut self.connections, &mut self.timers), token); } } } - _ => panic!("Unknown timer token"), } } - fn notify(&mut self, event_loop: &mut EventLoop, msg: Self::Message) { - match msg { - HostMessage::Shutdown => event_loop.shutdown(), - HostMessage::AddHandler { - handler, - protocol, - versions + fn message(&mut self, io: IoContext>, message: &mut NetworkIoMessage) { + match message { + &mut NetworkIoMessage::AddHandler { + ref mut handler, + ref protocol, + ref versions } => { - self.handlers.insert(protocol, handler); + self.handlers.insert(protocol, mem::replace(handler, None).unwrap()); for v in versions { - self.info.capabilities.push(CapabilityInfo { protocol: protocol, version: v, packet_count:0 }); + self.info.capabilities.push(CapabilityInfo { protocol: protocol, version: *v, packet_count:0 }); } }, - HostMessage::Send { - peer, - packet_id, - protocol, - data, + &mut NetworkIoMessage::Send { + ref peer, + ref packet_id, + ref protocol, + ref data, } => { - match self.connections.get_mut(Token(peer as usize)) { + match self.connections.get_mut(Token(*peer as usize)) { Some(&mut ConnectionEntry::Session(ref mut s)) => { - s.send_packet(protocol, packet_id as u8, &data).unwrap_or_else(|e| { + s.send_packet(protocol, *packet_id as u8, &data).unwrap_or_else(|e| { warn!(target: "net", "Send error: {:?}", e); }); //TODO: don't copy vector data }, @@ -723,10 +616,15 @@ impl Handler for Host { } } }, - HostMessage::UserMessage(message) => { + &mut NetworkIoMessage::User { + ref protocol, + ref message + } => { + let mut net_context = NetworkContext::new(io, None, None, &mut self.connections, &mut self.timers); for (p, h) in self.handlers.iter_mut() { - if p != &message.protocol { - h.message(&mut HostIo::new(message.protocol, None, event_loop, &mut self.connections, &mut self.timers), &message); + if p != protocol { + net_context.set_protocol(p); + h.message(&mut net_context, &message); } } } diff --git a/src/network/mod.rs b/src/network/mod.rs index cdce08d00..1a1450cb6 100644 --- a/src/network/mod.rs +++ b/src/network/mod.rs @@ -9,27 +9,27 @@ /// struct MyHandler; /// /// impl ProtocolHandler for MyHandler { -/// fn initialize(&mut self, io: &mut HandlerIo) { +/// fn initialize(&mut self, io: &mut NetworkContext) { /// io.register_timer(1000); /// } /// -/// fn read(&mut self, io: &mut HandlerIo, peer: &PeerId, packet_id: u8, data: &[u8]) { +/// fn read(&mut self, io: &mut NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { /// println!("Received {} ({} bytes) from {}", packet_id, data.len(), peer); /// } /// -/// fn connected(&mut self, io: &mut HandlerIo, peer: &PeerId) { +/// fn connected(&mut self, io: &mut NetworkContext, peer: &PeerId) { /// println!("Connected {}", peer); /// } /// -/// fn disconnected(&mut self, io: &mut HandlerIo, peer: &PeerId) { +/// fn disconnected(&mut self, io: &mut NetworkContext, peer: &PeerId) { /// println!("Disconnected {}", peer); /// } /// -/// fn timeout(&mut self, io: &mut HandlerIo, timer: TimerToken) { +/// fn timeout(&mut self, io: &mut NetworkContext, timer: TimerToken) { /// println!("Timeout {}", timer); /// } /// -/// fn message(&mut self, io: &mut HandlerIo, message: &Message) { +/// fn message(&mut self, io: &mut NetworkContext, message: &Message) { /// println!("Message {}:{}", message.protocol, message.id); /// } /// } @@ -50,69 +50,31 @@ mod handshake; mod session; mod discovery; mod service; - -#[derive(Debug, Copy, Clone)] -pub enum DisconnectReason -{ - DisconnectRequested, - TCPError, - BadProtocol, - UselessPeer, - TooManyPeers, - DuplicatePeer, - IncompatibleProtocol, - NullIdentity, - ClientQuit, - UnexpectedIdentity, - LocalIdentity, - PingTimeout, -} - -#[derive(Debug)] -pub enum NetworkError { - Auth, - BadProtocol, - PeerNotFound, - Disconnect(DisconnectReason), - Mio(::std::io::Error), -} - -impl From<::rlp::DecoderError> for NetworkError { - fn from(_err: ::rlp::DecoderError) -> NetworkError { - NetworkError::Auth - } -} - -impl From<::mio::NotifyError> for NetworkError { - fn from(_err: ::mio::NotifyError) -> NetworkError { - NetworkError::Mio(::std::io::Error::new(::std::io::ErrorKind::ConnectionAborted, "Network IO notification error")) - } -} +mod error; +mod node; pub type PeerId = host::PeerId; pub type PacketId = host::PacketId; -pub type TimerToken = host::TimerToken; -pub type HandlerIo<'s> = host::HostIo<'s>; -pub type Message = host::UserMessage; -pub type MessageId = host::UserMessageId; +pub type NetworkContext<'s, Message> = host::NetworkContext<'s, Message>; +pub type NetworkService = service::NetworkService; +pub type NetworkIoMessage = host::NetworkIoMessage; +pub type NetworkError = error::NetworkError; + +use io::*; /// Network IO protocol handler. This needs to be implemented for each new subprotocol. -/// TODO: Separate p2p networking IO from IPC IO. `timeout` and `message` should go to a more genera IO provider. /// All the handler function are called from within IO event loop. -pub trait ProtocolHandler: Send { - /// Initialize the hadler - fn initialize(&mut self, io: &mut HandlerIo); +/// `Message` is the type for message data. +pub trait NetworkProtocolHandler: Send where Message: Send { /// Called when new network packet received. - fn read(&mut self, io: &mut HandlerIo, peer: &PeerId, packet_id: u8, data: &[u8]); + fn read(&mut self, io: &mut NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]); /// Called when new peer is connected. Only called when peer supports the same protocol. - fn connected(&mut self, io: &mut HandlerIo, peer: &PeerId); + fn connected(&mut self, io: &mut NetworkContext, peer: &PeerId); /// Called when a previously connected peer disconnects. - fn disconnected(&mut self, io: &mut HandlerIo, peer: &PeerId); - /// Timer function called after a timeout created with `HandlerIo::timeout`. - fn timeout(&mut self, io: &mut HandlerIo, timer: TimerToken); - /// Called when a broadcasted message is received. The message can only be sent from a different protocol handler. - fn message(&mut self, io: &mut HandlerIo, message: &Message); + fn disconnected(&mut self, io: &mut NetworkContext, peer: &PeerId); + /// Timer function called after a timeout created with `NetworkContext::timeout`. + fn timeout(&mut self, _io: &mut NetworkContext, _timer: TimerToken) {} + /// Called when a broadcasted message is received. The message can only be sent from a different IO handler. + fn message(&mut self, _io: &mut NetworkContext, _message: &Message) {} } -pub type NetworkService = service::NetworkService; - diff --git a/src/network/service.rs b/src/network/service.rs index 23cbb57d9..be70e2ee1 100644 --- a/src/network/service.rs +++ b/src/network/service.rs @@ -1,32 +1,28 @@ -use std::thread::{self, JoinHandle}; -use mio::*; use error::*; -use network::{NetworkError, ProtocolHandler}; -use network::host::{Host, HostMessage, PeerId, PacketId, ProtocolId}; +use network::{NetworkProtocolHandler}; +use network::error::{NetworkError}; +use network::host::{Host, NetworkIoMessage, PeerId, PacketId, ProtocolId}; +use io::*; /// IO Service with networking -pub struct NetworkService { - thread: Option>, - host_channel: Sender +/// `Message` defines a notification data type. +pub struct NetworkService where Message: Send + 'static { + io_service: IoService>, } -impl NetworkService { +impl NetworkService where Message: Send + 'static { /// Starts IO event loop - pub fn start() -> Result { - let mut event_loop = EventLoop::new().unwrap(); - let channel = event_loop.channel(); - let thread = thread::spawn(move || { - Host::start(&mut event_loop).unwrap(); //TODO: - }); + pub fn start() -> Result, UtilError> { + let mut io_service = try!(IoService::>::start()); + try!(io_service.register_handler(Box::new(Host::new()))); Ok(NetworkService { - thread: Some(thread), - host_channel: channel + io_service: io_service }) } /// Send a message over the network. Normaly `HostIo::send` should be used. This can be used from non-io threads. pub fn send(&mut self, peer: &PeerId, packet_id: PacketId, protocol: ProtocolId, data: &[u8]) -> Result<(), NetworkError> { - try!(self.host_channel.send(HostMessage::Send { + try!(self.io_service.send_message(NetworkIoMessage::Send { peer: *peer, packet_id: packet_id, protocol: protocol, @@ -36,9 +32,9 @@ impl NetworkService { } /// Regiter a new protocol handler with the event loop. - pub fn register_protocol(&mut self, handler: Box, protocol: ProtocolId, versions: &[u8]) -> Result<(), NetworkError> { - try!(self.host_channel.send(HostMessage::AddHandler { - handler: handler, + pub fn register_protocol(&mut self, handler: Box+Send>, protocol: ProtocolId, versions: &[u8]) -> Result<(), NetworkError> { + try!(self.io_service.send_message(NetworkIoMessage::AddHandler { + handler: Some(handler), protocol: protocol, versions: versions.to_vec(), })); @@ -46,10 +42,3 @@ impl NetworkService { } } -impl Drop for NetworkService { - fn drop(&mut self) { - self.host_channel.send(HostMessage::Shutdown).unwrap(); - self.thread.take().unwrap().join().unwrap(); - } -} - diff --git a/src/network/session.rs b/src/network/session.rs index 96390d366..2d8ba2245 100644 --- a/src/network/session.rs +++ b/src/network/session.rs @@ -4,8 +4,9 @@ use rlp::*; use network::connection::{EncryptedConnection, Packet}; use network::handshake::Handshake; use error::*; -use network::{NetworkError, DisconnectReason}; +use network::error::{NetworkError, DisconnectReason}; use network::host::*; +use network::node::NodeId; /// Peer session over encrypted connection. /// When created waits for Hello packet exchange and signals ready state. @@ -83,7 +84,7 @@ const PACKET_LAST: u8 = 0x7f; impl Session { /// Create a new session out of comepleted handshake. Consumes handshake object. - pub fn new(h: Handshake, event_loop: &mut EventLoop, host: &HostInfo) -> Result { + pub fn new>(h: Handshake, event_loop: &mut EventLoop, host: &HostInfo) -> Result { let id = h.id.clone(); let connection = try!(EncryptedConnection::new(h)); let mut session = Session { @@ -108,7 +109,7 @@ impl Session { } /// Readable IO handler. Returns packet data if available. - pub fn readable(&mut self, event_loop: &mut EventLoop, host: &HostInfo) -> Result { + pub fn readable(&mut self, event_loop: &mut EventLoop, host: &HostInfo) -> Result { match try!(self.connection.readable(event_loop)) { Some(data) => Ok(try!(self.read_packet(data, host))), None => Ok(SessionData::None) @@ -116,7 +117,7 @@ impl Session { } /// Writable IO handler. Sends pending packets. - pub fn writable(&mut self, event_loop: &mut EventLoop, _host: &HostInfo) -> Result<(), UtilError> { + pub fn writable(&mut self, event_loop: &mut EventLoop, _host: &HostInfo) -> Result<(), UtilError> { self.connection.writable(event_loop) } @@ -126,7 +127,7 @@ impl Session { } /// Update registration with the event loop. Should be called at the end of the IO handler. - pub fn reregister(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { + pub fn reregister(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { self.connection.reregister(event_loop) }