diff --git a/src/io/mod.rs b/src/io/mod.rs index 72906b6f4..497e1ae78 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -3,47 +3,38 @@ /// Example usage for craeting a network service and adding an IO handler: /// /// ```rust -/// extern crate ethcore_util as util; -/// use util::network::*; +/// extern crate ethcore_util; +/// use ethcore_util::*; /// /// struct MyHandler; /// -/// impl ProtocolHandler for MyHandler { -/// fn initialize(&mut self, io: &mut HandlerIo) { -/// io.register_timer(1000); +/// struct MyMessage { +/// data: u32 +/// } +/// +/// impl IoHandler for MyHandler { +/// fn initialize(&mut self, io: &mut IoContext) { +/// io.register_timer(1000).unwrap(); /// } /// -/// fn read(&mut self, io: &mut HandlerIo, peer: &PeerId, packet_id: u8, data: &[u8]) { -/// println!("Received {} ({} bytes) from {}", packet_id, data.len(), peer); -/// } -/// -/// fn connected(&mut self, io: &mut HandlerIo, peer: &PeerId) { -/// println!("Connected {}", peer); -/// } -/// -/// fn disconnected(&mut self, io: &mut HandlerIo, peer: &PeerId) { -/// println!("Disconnected {}", peer); -/// } -/// -/// fn timeout(&mut self, io: &mut HandlerIo, timer: TimerToken) { +/// fn timeout(&mut self, _io: &mut IoContext, timer: TimerToken) { /// println!("Timeout {}", timer); /// } /// -/// fn message(&mut self, io: &mut HandlerIo, message: &Message) { -/// println!("Message {}:{}", message.protocol, message.id); +/// fn message(&mut self, _io: &mut IoContext, message: &mut MyMessage) { +/// println!("Message {}", message.data); /// } -/// } +/// } /// /// fn main () { -/// let mut service = NetworkService::start().expect("Error creating network service"); -/// service.register_protocol(Box::new(MyHandler), "myproto", &[1u8]); +/// let mut service = IoService::::start().expect("Error creating network service"); +/// service.register_handler(Box::new(MyHandler)).unwrap(); /// /// // Wait for quit condition /// // ... /// // Drop the service /// } /// ``` -extern crate mio; mod service; #[derive(Debug)] @@ -51,8 +42,8 @@ pub enum IoError { Mio(::std::io::Error), } -impl From<::mio::NotifyError>> for IoError where M: Send { - fn from(_err: ::mio::NotifyError>) -> IoError { +impl From<::mio::NotifyError>> for IoError where Message: Send { + fn from(_err: ::mio::NotifyError>) -> IoError { IoError::Mio(::std::io::Error::new(::std::io::ErrorKind::ConnectionAborted, "Network IO notification error")) } } @@ -62,17 +53,17 @@ impl From<::mio::NotifyError>> for IoError where M: Sen /// `Message` type is used as notification data pub trait IoHandler: Send where Message: Send + 'static { /// Initialize the handler - fn initialize(&mut self, _io: IoContext) {} + fn initialize<'s>(&'s mut self, _io: &mut IoContext<'s, Message>) {} /// Timer function called after a timeout created with `HandlerIo::timeout`. - fn timeout(&mut self, _io: IoContext, _timer: TimerToken) {} + fn timeout<'s>(&'s mut self, _io: &mut IoContext<'s, Message>, _timer: TimerToken) {} /// Called when a broadcasted message is received. The message can only be sent from a different IO handler. - fn message(&mut self, _io: IoContext, _message: &mut Message) {} // TODO: make message immutable and provide internal channel for adding network handler + fn message<'s>(&'s mut self, _io: &mut IoContext<'s, Message>, _message: &'s mut Message) {} // TODO: make message immutable and provide internal channel for adding network handler /// Called when an IO stream gets closed - fn stream_hup(&mut self, _io: IoContext, _stream: StreamToken) {} + fn stream_hup<'s>(&'s mut self, _io: &mut IoContext<'s, Message>, _stream: StreamToken) {} /// Called when an IO stream can be read from - fn stream_readable(&mut self, _io: IoContext, _stream: StreamToken) {} + fn stream_readable<'s>(&'s mut self, _io: &mut IoContext<'s, Message>, _stream: StreamToken) {} /// Called when an IO stream can be written to - fn stream_writable(&mut self, _io: IoContext, _stream: StreamToken) {} + fn stream_writable<'s>(&'s mut self, _io: &mut IoContext<'s, Message>, _stream: StreamToken) {} } pub type TimerToken = service::TimerToken; @@ -83,3 +74,35 @@ pub const USER_TOKEN_START: usize = service::USER_TOKEN; +#[cfg(test)] +mod tests { + + use io::*; + + struct MyHandler; + + struct MyMessage { + data: u32 + } + + impl IoHandler for MyHandler { + fn initialize(&mut self, io: &mut IoContext) { + io.register_timer(1000).unwrap(); + } + + fn timeout(&mut self, _io: &mut IoContext, timer: TimerToken) { + println!("Timeout {}", timer); + } + + fn message(&mut self, _io: &mut IoContext, message: &mut MyMessage) { + println!("Message {}", message.data); + } + } + + #[test] + fn test_service_register_handler () { + let mut service = IoService::::start().expect("Error creating network service"); + service.register_handler(Box::new(MyHandler)).unwrap(); + } + +} diff --git a/src/io/service.rs b/src/io/service.rs index 66814ba3c..aab8f5c2d 100644 --- a/src/io/service.rs +++ b/src/io/service.rs @@ -16,26 +16,26 @@ const LAST_USER_TIMER: usize = USER_TIMER + MAX_USER_TIMERS - 1; pub const USER_TOKEN: usize = LAST_USER_TIMER + 1; /// Messages used to communicate with the event loop from other threads. -pub enum IoMessage where M: Send + Sized { +pub enum IoMessage where Message: Send + Sized { /// Shutdown the event loop Shutdown, /// Register a new protocol handler. AddHandler { - handler: Box+Send>, + handler: Box+Send>, }, /// Broadcast a message across all protocol handlers. - UserMessage(M) + UserMessage(Message) } /// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem. -pub struct IoContext<'s, M> where M: Send + 'static { +pub struct IoContext<'s, Message> where Message: Send + 'static { timers: &'s mut Slab, - pub event_loop: &'s mut EventLoop>, + pub event_loop: &'s mut EventLoop>, } -impl<'s, M> IoContext<'s, M> where M: Send + 'static { +impl<'s, Message> IoContext<'s, Message> where Message: Send + 'static { /// Create a new IO access point. Takes references to all the data that can be updated within the IO handler. - fn new(event_loop: &'s mut EventLoop>, timers: &'s mut Slab) -> IoContext<'s, M> { + fn new(event_loop: &'s mut EventLoop>, timers: &'s mut Slab) -> IoContext<'s, Message> { IoContext { event_loop: event_loop, timers: timers, @@ -56,7 +56,7 @@ impl<'s, M> IoContext<'s, M> where M: Send + 'static { } /// Broadcast a message to other IO clients - pub fn message(&mut self, message: M) { + pub fn message(&mut self, message: Message) { match self.event_loop.channel().send(IoMessage::UserMessage(message)) { Ok(_) => {} Err(e) => { panic!("Error sending io message {:?}", e); } @@ -69,14 +69,14 @@ struct UserTimer { } /// Root IO handler. Manages user handlers, messages and IO timers. -pub struct IoManager where M: Send { +pub struct IoManager where Message: Send { timers: Slab, - handlers: Vec>>, + handlers: Vec>>, } -impl IoManager where M: Send + 'static { +impl IoManager where Message: Send + 'static { /// Creates a new instance and registers it with the event loop. - pub fn start(event_loop: &mut EventLoop>) -> Result<(), UtilError> { + pub fn start(event_loop: &mut EventLoop>) -> Result<(), UtilError> { let mut io = IoManager { timers: Slab::new_starting_at(Token(USER_TIMER), MAX_USER_TIMERS), handlers: Vec::new(), @@ -86,24 +86,24 @@ impl IoManager where M: Send + 'static { } } -impl Handler for IoManager where M: Send + 'static { +impl Handler for IoManager where Message: Send + 'static { type Timeout = Token; - type Message = IoMessage; + type Message = IoMessage; fn ready(&mut self, event_loop: &mut EventLoop, token: Token, events: EventSet) { if events.is_hup() { for h in self.handlers.iter_mut() { - h.stream_hup(IoContext::new(event_loop, &mut self.timers), token.as_usize()); + h.stream_hup(&mut IoContext::new(event_loop, &mut self.timers), token.as_usize()); } } else if events.is_readable() { for h in self.handlers.iter_mut() { - h.stream_readable(IoContext::new(event_loop, &mut self.timers), token.as_usize()); + h.stream_readable(&mut IoContext::new(event_loop, &mut self.timers), token.as_usize()); } } else if events.is_writable() { for h in self.handlers.iter_mut() { - h.stream_writable(IoContext::new(event_loop, &mut self.timers), token.as_usize()); + h.stream_writable(&mut IoContext::new(event_loop, &mut self.timers), token.as_usize()); } } } @@ -116,13 +116,13 @@ impl Handler for IoManager where M: Send + 'static { timer.delay }; for h in self.handlers.iter_mut() { - h.timeout(IoContext::new(event_loop, &mut self.timers), token.as_usize()); + h.timeout(&mut IoContext::new(event_loop, &mut self.timers), token.as_usize()); } event_loop.timeout_ms(token, delay).expect("Error re-registering user timer"); } _ => { // Just pass the event down. IoHandler is supposed to re-register it if required. for h in self.handlers.iter_mut() { - h.timeout(IoContext::new(event_loop, &mut self.timers), token.as_usize()); + h.timeout(&mut IoContext::new(event_loop, &mut self.timers), token.as_usize()); } } } @@ -139,7 +139,7 @@ impl Handler for IoManager where M: Send + 'static { }, IoMessage::UserMessage(ref mut data) => { for h in self.handlers.iter_mut() { - h.message(IoContext::new(event_loop, &mut self.timers), data); + h.message(&mut IoContext::new(event_loop, &mut self.timers), data); } } } @@ -147,19 +147,19 @@ impl Handler for IoManager where M: Send + 'static { } /// General IO Service. Starts an event loop and dispatches IO requests. -/// 'M' is a notification message type -pub struct IoService where M: Send + 'static { +/// 'Message' is a notification message type +pub struct IoService where Message: Send + 'static { thread: Option>, - host_channel: Sender> + host_channel: Sender> } -impl IoService where M: Send + 'static { +impl IoService where Message: Send + 'static { /// Starts IO event loop - pub fn start() -> Result, UtilError> { + pub fn start() -> Result, UtilError> { let mut event_loop = EventLoop::new().unwrap(); let channel = event_loop.channel(); let thread = thread::spawn(move || { - IoManager::::start(&mut event_loop).unwrap(); //TODO: + IoManager::::start(&mut event_loop).unwrap(); //TODO: }); Ok(IoService { thread: Some(thread), @@ -168,7 +168,7 @@ impl IoService where M: Send + 'static { } /// Regiter a IO hadnler with the event loop. - pub fn register_handler(&mut self, handler: Box+Send>) -> Result<(), IoError> { + pub fn register_handler(&mut self, handler: Box+Send>) -> Result<(), IoError> { try!(self.host_channel.send(IoMessage::AddHandler { handler: handler, })); @@ -176,13 +176,13 @@ impl IoService where M: Send + 'static { } /// Send a message over the network. Normaly `HostIo::send` should be used. This can be used from non-io threads. - pub fn send_message(&mut self, message: M) -> Result<(), IoError> { + pub fn send_message(&mut self, message: Message) -> Result<(), IoError> { try!(self.host_channel.send(IoMessage::UserMessage(message))); Ok(()) } } -impl Drop for IoService where M: Send { +impl Drop for IoService where Message: Send { fn drop(&mut self) { self.host_channel.send(IoMessage::Shutdown).unwrap(); self.thread.take().unwrap().join().unwrap(); diff --git a/src/lib.rs b/src/lib.rs index acd96ee5f..fdab5e92a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -89,3 +89,4 @@ pub use heapsizeof::*; pub use squeeze::*; pub use semantic_version::*; pub use network::*; +pub use io::*; diff --git a/src/network/host.rs b/src/network/host.rs index d61ef614c..8917b1f8e 100644 --- a/src/network/host.rs +++ b/src/network/host.rs @@ -1,5 +1,5 @@ use std::mem; -use std::net::{SocketAddr }; +use std::net::{SocketAddr}; use std::collections::{HashMap}; use std::hash::{Hasher}; use std::str::{FromStr}; @@ -103,20 +103,20 @@ impl Encodable for CapabilityInfo { } /// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem. -pub struct NetworkContext<'s, Message> where Message: Send + 'static { - io: IoContext<'s, NetworkIoMessage>, - protocol: Option, +pub struct NetworkContext<'s, 'io, Message> where Message: Send + 'static, 'io: 's { + io: &'s mut IoContext<'io, NetworkIoMessage>, + protocol: ProtocolId, connections: &'s mut Slab, timers: &'s mut HashMap, session: Option, } -impl<'s, Message> NetworkContext<'s, Message> where Message: Send + 'static, { +impl<'s, 'io, Message> NetworkContext<'s, 'io, Message> where Message: Send + 'static, { /// Create a new network IO access point. Takes references to all the data that can be updated within the IO handler. - fn new(io: IoContext<'s, NetworkIoMessage>, - protocol: Option, + fn new(io: &'s mut IoContext<'io, NetworkIoMessage>, + protocol: ProtocolId, session: Option, connections: &'s mut Slab, - timers: &'s mut HashMap) -> NetworkContext<'s, Message> { + timers: &'s mut HashMap) -> NetworkContext<'s, 'io, Message> { NetworkContext { io: io, protocol: protocol, @@ -126,15 +126,11 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + 'static, { } } - fn set_protocol(&mut self, protocol: ProtocolId) { - self.protocol = Some(protocol); - } - /// Send a packet over the network to another peer. pub fn send(&mut self, peer: PeerId, packet_id: PacketId, data: Vec) -> Result<(), UtilError> { match self.connections.get_mut(Token(peer)) { Some(&mut ConnectionEntry::Session(ref mut s)) => { - s.send_packet(self.protocol.unwrap(), packet_id as u8, &data).unwrap_or_else(|e| { + s.send_packet(self.protocol, packet_id as u8, &data).unwrap_or_else(|e| { warn!(target: "net", "Send error: {:?}", e); }); //TODO: don't copy vector data }, @@ -164,7 +160,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + 'static, { pub fn register_timer(&mut self, ms: u64) -> Result{ match self.io.register_timer(ms) { Ok(token) => { - self.timers.insert(token, self.protocol.unwrap()); + self.timers.insert(token, self.protocol); Ok(token) }, e @ Err(_) => e, @@ -259,7 +255,7 @@ impl Host where Message: Send { } } - fn maintain_network(&mut self, io: IoContext>) { + fn maintain_network(&mut self, io: &mut IoContext>) { self.connect_peers(io); } @@ -271,7 +267,7 @@ impl Host where Message: Send { self.connections.iter().any(|e| match e { &ConnectionEntry::Handshake(ref h) => h.id.eq(&id), _ => false }) } - fn connect_peers(&mut self, mut io: IoContext>) { + fn connect_peers(&mut self, io: &mut IoContext>) { struct NodeInfo { id: NodeId, peer_type: PeerType @@ -296,7 +292,7 @@ impl Host where Message: Send { for n in to_connect.iter() { if n.peer_type == PeerType::Required { if req_conn < IDEAL_PEERS { - self.connect_peer(&n.id, &mut io); + self.connect_peer(&n.id, io); } req_conn += 1; } @@ -311,7 +307,7 @@ impl Host where Message: Send { for n in to_connect.iter() { if n.peer_type == PeerType::Optional && open_slots > 0 { open_slots -= 1; - self.connect_peer(&n.id, &mut io); + self.connect_peer(&n.id, io); } } } @@ -362,11 +358,11 @@ impl Host where Message: Send { } - fn accept(&mut self, _io: IoContext>) { + fn accept(&mut self, _io: &mut IoContext>) { trace!(target: "net", "accept"); } - fn connection_writable<'s>(&'s mut self, token: StreamToken, io: IoContext<'s, NetworkIoMessage>) { + fn connection_writable<'s>(&'s mut self, token: StreamToken, io: &mut IoContext<'s, NetworkIoMessage>) { let mut kill = false; let mut create_session = false; match self.connections.get_mut(Token(token)) { @@ -387,27 +383,25 @@ impl Host where Message: Send { warn!(target: "net", "Received event for unknown connection"); } } - let mut net_context = NetworkContext::new(io, None, Some(token), &mut self.connections, &mut self.timers); if kill { - Host::kill_connection(token, &mut net_context, &mut self.handlers); + self.kill_connection(token, io); + return; + } else if create_session { + self.start_session(token, io); } - if create_session { - Host::start_session(&self.info, token, &mut net_context); - } - match net_context.connections.get_mut(Token(token)) { + match self.connections.get_mut(Token(token)) { Some(&mut ConnectionEntry::Session(ref mut s)) => { - s.reregister(net_context.io.event_loop).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e)); + s.reregister(io.event_loop).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e)); }, _ => (), } } - fn connection_closed<'s>(&'s mut self, token: TimerToken, io: IoContext<'s, NetworkIoMessage>) { - let mut net_context = NetworkContext::new(io, None, Some(token), &mut self.connections, &mut self.timers); - Host::kill_connection(token, &mut net_context, &mut self.handlers); + fn connection_closed<'s>(&'s mut self, token: TimerToken, io: &mut IoContext<'s, NetworkIoMessage>) { + self.kill_connection(token, io); } - fn connection_readable<'s>(&'s mut self, token: StreamToken, io: IoContext<'s, NetworkIoMessage>) { + fn connection_readable<'s>(&'s mut self, token: StreamToken, io: &mut IoContext<'s, NetworkIoMessage>) { let mut kill = false; let mut create_session = false; let mut ready_data: Vec = Vec::new(); @@ -451,37 +445,35 @@ impl Host where Message: Send { warn!(target: "net", "Received event for unknown connection"); } } - let mut net_context = NetworkContext::new(io, None, Some(token), &mut self.connections, &mut self.timers); if kill { - Host::kill_connection(token, &mut net_context, &mut self.handlers); + self.kill_connection(token, io); + return; } if create_session { - Host::start_session(&self.info, token, &mut net_context); + self.start_session(token, io); } for p in ready_data { let mut h = self.handlers.get_mut(p).unwrap(); - net_context.set_protocol(p); - h.connected(&mut net_context, &token); + h.connected(&mut NetworkContext::new(io, p, Some(token), &mut self.connections, &mut self.timers), &token); } if let Some((p, packet_id, data)) = packet_data { let mut h = self.handlers.get_mut(p).unwrap(); - net_context.set_protocol(p); - h.read(&mut net_context, &token, packet_id, &data[1..]); + h.read(&mut NetworkContext::new(io, p, Some(token), &mut self.connections, &mut self.timers), &token, packet_id, &data[1..]); } - match net_context.connections.get_mut(Token(token)) { + match self.connections.get_mut(Token(token)) { Some(&mut ConnectionEntry::Session(ref mut s)) => { - s.reregister(net_context.io.event_loop).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e)); + s.reregister(io.event_loop).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e)); }, _ => (), } } - fn start_session(info: &HostInfo, token: StreamToken, io: &mut NetworkContext) { - let event_loop = &mut io.io.event_loop; - io.connections.replace_with(Token(token), |c| { + fn start_session(&mut self, token: StreamToken, io: &mut IoContext>) { + let info = &self.info; + self.connections.replace_with(Token(token), |c| { match c { - ConnectionEntry::Handshake(h) => Session::new(h, event_loop, info) + ConnectionEntry::Handshake(h) => Session::new(h, io.event_loop, info) .map(|s| Some(ConnectionEntry::Session(s))) .unwrap_or_else(|e| { debug!(target: "net", "Session construction error: {:?}", e); @@ -492,17 +484,16 @@ impl Host where Message: Send { }).expect("Error updating slab with session"); } - fn connection_timeout<'s>(&'s mut self, token: StreamToken, io: IoContext<'s, NetworkIoMessage>) { - let mut net_context = NetworkContext::new(io, None, Some(token), &mut self.connections, &mut self.timers); - Host::kill_connection(token, &mut net_context, &mut self.handlers) + fn connection_timeout<'s>(&'s mut self, token: StreamToken, io: &mut IoContext<'s, NetworkIoMessage>) { + self.kill_connection(token, io) } - fn kill_connection(token: StreamToken, io: &mut NetworkContext, handlers: &mut HashMap>>) { + fn kill_connection<'s>(&'s mut self, token: StreamToken, io: &mut IoContext<'s, NetworkIoMessage>) { let mut to_disconnect: Vec = Vec::new(); - match io.connections.get_mut(Token(token)) { + match self.connections.get_mut(Token(token)) { Some(&mut ConnectionEntry::Handshake(_)) => (), // just abandon handshake Some(&mut ConnectionEntry::Session(ref mut s)) if s.is_ready() => { - for (p, _) in handlers.iter_mut() { + for (p, _) in self.handlers.iter_mut() { if s.have_capability(p) { to_disconnect.push(p); } @@ -511,17 +502,16 @@ impl Host where Message: Send { _ => (), } for p in to_disconnect { - let mut h = handlers.get_mut(p).unwrap(); - io.set_protocol(p); - h.disconnected(io, &token); + let mut h = self.handlers.get_mut(p).unwrap(); + h.disconnected(&mut NetworkContext::new(io, p, Some(token), &mut self.connections, &mut self.timers), &token); } - io.connections.remove(Token(token)); + self.connections.remove(Token(token)); } } impl IoHandler> for Host where Message: Send + 'static { /// Initialize networking - fn initialize(&mut self, io: IoContext>) { + fn initialize(&mut self, io: &mut IoContext>) { /* match ::ifaces::Interface::get_all().unwrap().into_iter().filter(|x| x.kind == ::ifaces::Kind::Packet && x.addr.is_some()).next() { Some(iface) => config.public_address = iface.addr.unwrap(), @@ -545,7 +535,7 @@ impl IoHandler> for Host where Messa self.add_node("enode://5374c1bff8df923d3706357eeb4983cd29a63be40a269aaa2296ee5f3b2119a8978c0ed68b8f6fc84aad0df18790417daadf91a4bfbb786a16c9b0a199fa254a@92.51.165.126:30303"); } - fn stream_hup(&mut self, io: IoContext>, stream: StreamToken) { + fn stream_hup<'s>(&'s mut self, io: &mut IoContext<'s, NetworkIoMessage>, stream: StreamToken) { trace!(target: "net", "Hup: {}", stream); match stream { FIRST_CONNECTION ... LAST_CONNECTION => self.connection_closed(stream, io), @@ -553,7 +543,7 @@ impl IoHandler> for Host where Messa }; } - fn stream_readable(&mut self, io: IoContext>, stream: StreamToken) { + fn stream_readable<'s>(&'s mut self, io: &mut IoContext<'s, NetworkIoMessage>, stream: StreamToken) { match stream { TCP_ACCEPT => self.accept(io), FIRST_CONNECTION ... LAST_CONNECTION => self.connection_readable(stream, io), @@ -562,39 +552,41 @@ impl IoHandler> for Host where Messa } } - fn stream_writable(&mut self, io: IoContext>, stream: StreamToken) { + fn stream_writable<'s>(&'s mut self, io: &mut IoContext<'s, NetworkIoMessage>, stream: StreamToken) { match stream { FIRST_CONNECTION ... LAST_CONNECTION => self.connection_writable(stream, io), _ => panic!("Received unknown writable token"), } } - fn timeout(&mut self, io: IoContext>, token: TimerToken) { + fn timeout<'s>(&'s mut self, io: &mut IoContext<'s, NetworkIoMessage>, token: TimerToken) { match token { IDLE => self.maintain_network(io), FIRST_CONNECTION ... LAST_CONNECTION => self.connection_timeout(token, io), NODETABLE_DISCOVERY => {}, NODETABLE_MAINTAIN => {}, _ => { - let protocol: ProtocolId = self.timers.get_mut(&token).expect("Unknown user timer token"); + let protocol = *self.timers.get_mut(&token).expect("Unknown user timer token"); match self.handlers.get_mut(protocol) { None => { warn!(target: "net", "No handler found for protocol: {:?}", protocol) }, Some(h) => { - h.timeout(&mut NetworkContext::new(io, Some(protocol), Some(token), &mut self.connections, &mut self.timers), token); + h.timeout(&mut NetworkContext::new(io, protocol, Some(token), &mut self.connections, &mut self.timers), token); } } } } } - fn message(&mut self, io: IoContext>, message: &mut NetworkIoMessage) { + fn message<'s>(&'s mut self, io: &mut IoContext<'s, NetworkIoMessage>, message: &'s mut NetworkIoMessage) { match message { &mut NetworkIoMessage::AddHandler { ref mut handler, ref protocol, ref versions } => { - self.handlers.insert(protocol, mem::replace(handler, None).unwrap()); + let mut h = mem::replace(handler, None).unwrap(); + h.initialize(&mut NetworkContext::new(io, protocol, None, &mut self.connections, &mut self.timers)); + self.handlers.insert(protocol, h); for v in versions { self.info.capabilities.push(CapabilityInfo { protocol: protocol, version: *v, packet_count:0 }); } @@ -620,11 +612,9 @@ impl IoHandler> for Host where Messa ref protocol, ref message } => { - let mut net_context = NetworkContext::new(io, None, None, &mut self.connections, &mut self.timers); for (p, h) in self.handlers.iter_mut() { if p != protocol { - net_context.set_protocol(p); - h.message(&mut net_context, &message); + h.message(&mut NetworkContext::new(io, p, None, &mut self.connections, &mut self.timers), &message); } } } diff --git a/src/network/mod.rs b/src/network/mod.rs index 1a1450cb6..7bc7ee71c 100644 --- a/src/network/mod.rs +++ b/src/network/mod.rs @@ -4,38 +4,42 @@ /// /// ```rust /// extern crate ethcore_util as util; -/// use util::network::*; +/// use util::*; /// /// struct MyHandler; /// -/// impl ProtocolHandler for MyHandler { -/// fn initialize(&mut self, io: &mut NetworkContext) { +/// struct MyMessage { +/// data: u32 +/// } +/// +/// impl NetworkProtocolHandler for MyHandler { +/// fn initialize(&mut self, io: &mut NetworkContext) { /// io.register_timer(1000); /// } /// -/// fn read(&mut self, io: &mut NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { +/// fn read(&mut self, io: &mut NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { /// println!("Received {} ({} bytes) from {}", packet_id, data.len(), peer); /// } /// -/// fn connected(&mut self, io: &mut NetworkContext, peer: &PeerId) { +/// fn connected(&mut self, io: &mut NetworkContext, peer: &PeerId) { /// println!("Connected {}", peer); /// } /// -/// fn disconnected(&mut self, io: &mut NetworkContext, peer: &PeerId) { +/// fn disconnected(&mut self, io: &mut NetworkContext, peer: &PeerId) { /// println!("Disconnected {}", peer); /// } /// -/// fn timeout(&mut self, io: &mut NetworkContext, timer: TimerToken) { +/// fn timeout(&mut self, io: &mut NetworkContext, timer: TimerToken) { /// println!("Timeout {}", timer); /// } /// -/// fn message(&mut self, io: &mut NetworkContext, message: &Message) { -/// println!("Message {}:{}", message.protocol, message.id); +/// fn message(&mut self, io: &mut NetworkContext, message: &MyMessage) { +/// println!("Message {}", message.data); /// } /// } /// /// fn main () { -/// let mut service = NetworkService::start().expect("Error creating network service"); +/// let mut service = NetworkService::::start().expect("Error creating network service"); /// service.register_protocol(Box::new(MyHandler), "myproto", &[1u8]); /// /// // Wait for quit condition @@ -43,7 +47,6 @@ /// // Drop the service /// } /// ``` -extern crate mio; mod host; mod connection; mod handshake; @@ -55,7 +58,7 @@ mod node; pub type PeerId = host::PeerId; pub type PacketId = host::PacketId; -pub type NetworkContext<'s, Message> = host::NetworkContext<'s, Message>; +pub type NetworkContext<'s,'io, Message> = host::NetworkContext<'s, 'io, Message>; pub type NetworkService = service::NetworkService; pub type NetworkIoMessage = host::NetworkIoMessage; pub type NetworkError = error::NetworkError; @@ -66,6 +69,8 @@ use io::*; /// All the handler function are called from within IO event loop. /// `Message` is the type for message data. pub trait NetworkProtocolHandler: Send where Message: Send { + /// Initialize the handler + fn initialize(&mut self, _io: &mut NetworkContext) {} /// Called when new network packet received. fn read(&mut self, io: &mut NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]); /// Called when new peer is connected. Only called when peer supports the same protocol.