From cd250d4959dfb423292192f1af11b92c5ca5ac8c Mon Sep 17 00:00:00 2001 From: arkpar Date: Sun, 24 Jan 2016 18:53:54 +0100 Subject: [PATCH] Incoming connections; Tests --- cov.sh | 2 +- src/bin/client/main.rs | 2 +- util/src/network/connection.rs | 26 ++++++--- util/src/network/handshake.rs | 25 +++++--- util/src/network/host.rs | 53 +++++++++++++---- util/src/network/mod.rs | 56 ++---------------- util/src/network/service.rs | 9 +++ util/src/network/stats.rs | 51 ++++++++++++++++ util/src/network/tests.rs | 103 +++++++++++++++++++++++++++++++++ 9 files changed, 249 insertions(+), 78 deletions(-) create mode 100644 util/src/network/stats.rs create mode 100644 util/src/network/tests.rs diff --git a/cov.sh b/cov.sh index 9f2a87a47..371746a39 100755 --- a/cov.sh +++ b/cov.sh @@ -17,5 +17,5 @@ fi cargo test --no-run || exit $? mkdir -p target/coverage -kcov --exclude-pattern ~/.multirust --include-pattern src --verify target/coverage target/debug/ethcore* +kcov --exclude-pattern ~/.multirust,rocksdb,secp256k1 --include-pattern src --verify target/coverage target/debug/ethcore* xdg-open target/coverage/index.html diff --git a/src/bin/client/main.rs b/src/bin/client/main.rs index e2892693c..ff81aae31 100644 --- a/src/bin/client/main.rs +++ b/src/bin/client/main.rs @@ -33,7 +33,7 @@ fn main() { let mut net_settings = NetworkConfiguration::new(); let args: Vec<_> = env::args().collect(); if args.len() == 2 { - net_settings.boot_nodes.push(args[1].trim_matches('\"').to_string()); + net_settings.boot_nodes = Some(vec! [args[1].trim_matches('\"').to_string()]); } let mut service = ClientService::start(spec, net_settings).unwrap(); diff --git a/util/src/network/connection.rs b/util/src/network/connection.rs index fb7bfb734..7ed8c3c18 100644 --- a/util/src/network/connection.rs +++ b/util/src/network/connection.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::collections::VecDeque; use mio::{Handler, Token, EventSet, EventLoop, PollOpt, TryRead, TryWrite}; use mio::tcp::*; @@ -10,6 +11,7 @@ use error::*; use io::{IoContext, StreamToken}; use network::error::NetworkError; use network::handshake::Handshake; +use network::stats::NetworkStats; use crypto; use rcrypto::blockmodes::*; use rcrypto::aessafe::*; @@ -34,6 +36,8 @@ pub struct Connection { send_queue: VecDeque>, /// Event flags this connection expects interest: EventSet, + /// Shared network staistics + stats: Arc, } /// Connection write status. @@ -47,7 +51,7 @@ pub enum WriteStatus { impl Connection { /// Create a new connection with given id and socket. - pub fn new(token: StreamToken, socket: TcpStream) -> Connection { + pub fn new(token: StreamToken, socket: TcpStream, stats: Arc) -> Connection { Connection { token: token, socket: socket, @@ -55,6 +59,7 @@ impl Connection { rec_buf: Bytes::new(), rec_size: 0, interest: EventSet::hup() | EventSet::readable(), + stats: stats, } } @@ -68,7 +73,6 @@ impl Connection { } /// Readable IO handler. Called when there is some data to be read. - //TODO: return a slice pub fn readable(&mut self) -> io::Result> { if self.rec_size == 0 || self.rec_buf.len() >= self.rec_size { warn!(target:"net", "Unexpected connection read"); @@ -77,9 +81,12 @@ impl Connection { // resolve "multiple applicable items in scope [E0034]" error let sock_ref = ::by_ref(&mut self.socket); match sock_ref.take(max as u64).try_read_buf(&mut self.rec_buf) { - Ok(Some(_)) if self.rec_buf.len() == self.rec_size => { - self.rec_size = 0; - Ok(Some(::std::mem::replace(&mut self.rec_buf, Bytes::new()))) + Ok(Some(size)) if size != 0 => { + self.stats.inc_recv(size); + if self.rec_size != 0 && self.rec_buf.len() == self.rec_size { + self.rec_size = 0; + Ok(Some(::std::mem::replace(&mut self.rec_buf, Bytes::new()))) + } else { Ok(None) } }, Ok(_) => Ok(None), Err(e) => Err(e), @@ -109,14 +116,17 @@ impl Connection { return Ok(WriteStatus::Complete) } match self.socket.try_write_buf(buf) { - Ok(_) if (buf.position() as usize) < send_size => { + Ok(Some(size)) if (buf.position() as usize) < send_size => { self.interest.insert(EventSet::writable()); + self.stats.inc_send(size); Ok(WriteStatus::Ongoing) }, - Ok(_) if (buf.position() as usize) == send_size => { + Ok(Some(size)) if (buf.position() as usize) == send_size => { + self.stats.inc_send(size); Ok(WriteStatus::Complete) }, - Ok(_) => { panic!("Wrote past buffer");}, + Ok(Some(_)) => { panic!("Wrote past buffer");}, + Ok(None) => Ok(WriteStatus::Ongoing), Err(e) => Err(e) } }.and_then(|r| { diff --git a/util/src/network/handshake.rs b/util/src/network/handshake.rs index acac77d04..9b835d5bd 100644 --- a/util/src/network/handshake.rs +++ b/util/src/network/handshake.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use mio::*; use mio::tcp::*; use hash::*; @@ -10,6 +11,7 @@ use network::host::{HostInfo}; use network::node::NodeId; use error::*; use network::error::NetworkError; +use network::stats::NetworkStats; use io::{IoContext, StreamToken}; #[derive(PartialEq, Eq, Debug)] @@ -54,10 +56,10 @@ const HANDSHAKE_TIMEOUT: u64 = 30000; impl Handshake { /// Create a new handshake object - pub fn new(token: StreamToken, id: &NodeId, socket: TcpStream, nonce: &H256) -> Result { + pub fn new(token: StreamToken, id: Option<&NodeId>, socket: TcpStream, nonce: &H256, stats: Arc) -> Result { Ok(Handshake { - id: id.clone(), - connection: Connection::new(token, socket), + id: if let Some(id) = id { id.clone()} else { NodeId::new() }, + connection: Connection::new(token, socket, stats), originated: false, state: HandshakeState::New, ecdhe: try!(KeyPair::create()), @@ -143,29 +145,36 @@ impl Handshake { /// Parse, validate and confirm auth message fn read_auth(&mut self, host: &HostInfo, data: &[u8]) -> Result<(), UtilError> { trace!(target:"net", "Received handshake auth to {:?}", self.connection.socket.peer_addr()); - assert!(data.len() == AUTH_PACKET_SIZE); + if data.len() != AUTH_PACKET_SIZE { + debug!(target:"net", "Wrong auth packet size"); + return Err(From::from(NetworkError::BadProtocol)); + } self.auth_cipher = data.to_vec(); let auth = try!(ecies::decrypt(host.secret(), data)); let (sig, rest) = auth.split_at(65); let (hepubk, rest) = rest.split_at(32); let (pubk, rest) = rest.split_at(64); let (nonce, _) = rest.split_at(32); - self.remote_public.clone_from_slice(pubk); + self.id.clone_from_slice(pubk); self.remote_nonce.clone_from_slice(nonce); - let shared = try!(ecdh::agree(host.secret(), &self.remote_public)); + let shared = try!(ecdh::agree(host.secret(), &self.id)); let signature = Signature::from_slice(sig); let spub = try!(ec::recover(&signature, &(&shared ^ &self.remote_nonce))); + self.remote_public = spub.clone(); if &spub.sha3()[..] != hepubk { trace!(target:"net", "Handshake hash mismath with {:?}", self.connection.socket.peer_addr()); return Err(From::from(NetworkError::Auth)); }; - self.write_ack() + Ok(()) } /// Parse and validate ack message fn read_ack(&mut self, host: &HostInfo, data: &[u8]) -> Result<(), UtilError> { trace!(target:"net", "Received handshake auth to {:?}", self.connection.socket.peer_addr()); - assert!(data.len() == ACK_PACKET_SIZE); + if data.len() != ACK_PACKET_SIZE { + debug!(target:"net", "Wrong ack packet size"); + return Err(From::from(NetworkError::BadProtocol)); + } self.ack_cipher = data.to_vec(); let ack = try!(ecies::decrypt(host.secret(), data)); self.remote_public.clone_from_slice(&ack[0..64]); diff --git a/util/src/network/host.rs b/util/src/network/host.rs index 8fffdde69..67338b83f 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -17,6 +17,7 @@ use error::*; use io::*; use network::NetworkProtocolHandler; use network::node::*; +use network::stats::NetworkStats; type Slab = ::slab::Slab; @@ -41,7 +42,9 @@ pub struct NetworkConfiguration { /// Pin to boot nodes only pub pin: bool, /// List of initial node addresses - pub boot_nodes: Vec, + pub boot_nodes: Option>, + /// Use provided node key instead of default + pub use_secret: Option, } impl NetworkConfiguration { @@ -53,9 +56,18 @@ impl NetworkConfiguration { nat_enabled: true, discovery_enabled: true, pin: false, - boot_nodes: Vec::new(), + boot_nodes: None, + use_secret: None, } } + + /// Create new default configuration with sepcified listen port. + pub fn new_with_port(port: u16) -> NetworkConfiguration { + let mut config = NetworkConfiguration::new(); + config.listen_address = SocketAddr::from_str(&format!("0.0.0.0:{}", port)).unwrap(); + config.public_address = SocketAddr::from_str(&format!("0.0.0.0:{}", port)).unwrap(); + config + } } // Tokens @@ -253,6 +265,7 @@ pub struct Host where Message: Send + Sync + Clone { handlers: RwLock>>>, timers: RwLock>, timer_counter: RwLock, + stats: Arc, } impl Host where Message: Send + Sync + Clone { @@ -264,7 +277,7 @@ impl Host where Message: Send + Sync + Clone { let udp_socket = UdpSocket::bound(&addr).unwrap(); let mut host = Host:: { info: RwLock::new(HostInfo { - keys: KeyPair::create().unwrap(), + keys: if let Some(ref secret) = config.use_secret { KeyPair::from_secret(secret.clone()).unwrap() } else { KeyPair::create().unwrap() }, config: config, nonce: H256::random(), protocol_version: 4, @@ -279,6 +292,7 @@ impl Host where Message: Send + Sync + Clone { handlers: RwLock::new(HashMap::new()), timers: RwLock::new(HashMap::new()), timer_counter: RwLock::new(LAST_CONNECTION + 1), + stats: Arc::new(NetworkStats::default()), }; let port = host.info.read().unwrap().config.listen_address.port(); host.info.write().unwrap().deref_mut().listen_port = port; @@ -290,21 +304,24 @@ impl Host where Message: Send + Sync + Clone { */ let boot_nodes = host.info.read().unwrap().config.boot_nodes.clone(); - if boot_nodes.is_empty() { + if boot_nodes.is_none() { // GO bootnodes host.add_node("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@52.16.188.185:30303"); // IE host.add_node("enode://de471bccee3d042261d52e9bff31458daecc406142b401d4cd848f677479f73104b9fdeb090af9583d3391b7f10cb2ba9e26865dd5fca4fcdc0fb1e3b723c786@54.94.239.50:30303"); // BR host.add_node("enode://1118980bf48b0a3640bdba04e0fe78b1add18e1cd99bf22d53daac1fd9972ad650df52176e7c7d89d1114cfef2bc23a2959aa54998a46afcf7d91809f0855082@52.74.57.123:30303"); // SG } else { - for n in boot_nodes { + for n in boot_nodes.unwrap() { host.add_node(&n); } } - // ETH/DEV cpp-ethereum (poc-9.ethdev.com) host } + pub fn stats(&self) -> Arc { + self.stats.clone() + } + pub fn add_node(&mut self, id: &str) { match Node::from_str(id) { Err(e) => { warn!("Could not add node: {:?}", e); }, @@ -382,7 +399,6 @@ impl Host where Message: Send + Sync + Clone { } #[allow(single_match)] - #[allow(block_in_if_condition_stmt)] fn connect_peer(&self, id: &NodeId, io: &IoContext>) { if self.have_session(id) { @@ -409,12 +425,16 @@ impl Host where Message: Send + Sync + Clone { } } }; + self.create_connection(socket, Some(id), io); + } + #[allow(block_in_if_condition_stmt)] + fn create_connection(&self, socket: TcpStream, id: Option<&NodeId>, io: &IoContext>) { let nonce = self.info.write().unwrap().next_nonce(); let mut connections = self.connections.write().unwrap(); if connections.insert_with(|token| { - let mut handshake = Handshake::new(token, id, socket, &nonce).expect("Can't create handshake"); - handshake.start(io, &self.info.read().unwrap(), true).and_then(|_| io.register_stream(token)).unwrap_or_else (|e| { + let mut handshake = Handshake::new(token, id, socket, &nonce, self.stats.clone()).expect("Can't create handshake"); + handshake.start(io, &self.info.read().unwrap(), id.is_some()).and_then(|_| io.register_stream(token)).unwrap_or_else (|e| { debug!(target: "net", "Handshake create error: {:?}", e); }); Arc::new(Mutex::new(ConnectionEntry::Handshake(handshake))) @@ -423,8 +443,20 @@ impl Host where Message: Send + Sync + Clone { } } - fn accept(&self, _io: &IoContext>) { + fn accept(&self, io: &IoContext>) { trace!(target: "net", "accept"); + loop { + let socket = match self.tcp_listener.lock().unwrap().accept() { + Ok(None) => break, + Ok(Some((sock, _addr))) => sock, + Err(e) => { + warn!("Error accepting connection: {:?}", e); + break + }, + }; + self.create_connection(socket, None, io); + } + io.update_registration(TCP_ACCEPT).expect("Error registering TCP listener"); } #[allow(single_match)] @@ -539,6 +571,7 @@ impl Host where Message: Send + Sync + Clone { ConnectionEntry::Handshake(h) => { let session = Session::new(h, io, &self.info.read().unwrap()).expect("Session creation error"); io.update_registration(token).expect("Error updating session registration"); + self.stats.inc_sessions(); Some(Arc::new(Mutex::new(ConnectionEntry::Session(session)))) }, _ => { None } // handshake expired diff --git a/util/src/network/mod.rs b/util/src/network/mod.rs index c175ab0a2..668cdc8b1 100644 --- a/util/src/network/mod.rs +++ b/util/src/network/mod.rs @@ -1,5 +1,4 @@ -/// Network and general IO module. -/// +/// Network and general IO module. /// Example usage for craeting a network service and adding an IO handler: /// /// ```rust @@ -56,22 +55,20 @@ mod discovery; mod service; mod error; mod node; +mod stats; + +#[cfg(test)] +mod tests; -/// TODO [arkpar] Please document me pub use network::host::PeerId; -/// TODO [arkpar] Please document me pub use network::host::PacketId; -/// TODO [arkpar] Please document me pub use network::host::NetworkContext; -/// TODO [arkpar] Please document me pub use network::service::NetworkService; -/// TODO [arkpar] Please document me pub use network::host::NetworkIoMessage; -/// TODO [arkpar] Please document me pub use network::host::NetworkIoMessage::User as UserMessage; -/// TODO [arkpar] Please document me pub use network::error::NetworkError; pub use network::host::NetworkConfiguration; +pub use network::stats::NetworkStats; use io::TimerToken; @@ -93,44 +90,3 @@ pub trait NetworkProtocolHandler: Sync + Send where Message: Send + Syn fn message(&self, _io: &NetworkContext, _message: &Message) {} } - -#[test] -fn test_net_service() { - - use std::sync::Arc; - struct MyHandler; - - #[derive(Clone)] - struct MyMessage { - data: u32 - } - - impl NetworkProtocolHandler for MyHandler { - fn initialize(&self, io: &NetworkContext) { - io.register_timer(0, 1000).unwrap(); - } - - fn read(&self, _io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { - println!("Received {} ({} bytes) from {}", packet_id, data.len(), peer); - } - - fn connected(&self, _io: &NetworkContext, peer: &PeerId) { - println!("Connected {}", peer); - } - - fn disconnected(&self, _io: &NetworkContext, peer: &PeerId) { - println!("Disconnected {}", peer); - } - - fn timeout(&self, _io: &NetworkContext, timer: TimerToken) { - println!("Timeout {}", timer); - } - - fn message(&self, _io: &NetworkContext, message: &MyMessage) { - println!("Message {}", message.data); - } - } - - let mut service = NetworkService::::start(NetworkConfiguration::new()).expect("Error creating network service"); - service.register_protocol(Arc::new(MyHandler), "myproto", &[1u8]).unwrap(); -} diff --git a/util/src/network/service.rs b/util/src/network/service.rs index 8c29a8042..cbf400872 100644 --- a/util/src/network/service.rs +++ b/util/src/network/service.rs @@ -3,6 +3,7 @@ use error::*; use network::{NetworkProtocolHandler, NetworkConfiguration}; use network::error::{NetworkError}; use network::host::{Host, NetworkIoMessage, ProtocolId}; +use network::stats::{NetworkStats}; use io::*; /// IO Service with networking @@ -10,6 +11,7 @@ use io::*; pub struct NetworkService where Message: Send + Sync + Clone + 'static { io_service: IoService>, host_info: String, + stats: Arc } impl NetworkService where Message: Send + Sync + Clone + 'static { @@ -17,12 +19,14 @@ impl NetworkService where Message: Send + Sync + Clone + 'stat pub fn start(config: NetworkConfiguration) -> Result, UtilError> { let mut io_service = try!(IoService::>::start()); let host = Arc::new(Host::new(config)); + let stats = host.stats().clone(); let host_info = host.client_version(); info!("NetworkService::start(): id={:?}", host.client_id()); try!(io_service.register_handler(host)); Ok(NetworkService { io_service: io_service, host_info: host_info, + stats: stats, }) } @@ -45,5 +49,10 @@ impl NetworkService where Message: Send + Sync + Clone + 'stat pub fn io(&mut self) -> &mut IoService> { &mut self.io_service } + + /// Returns underlying io service. + pub fn stats(&self) -> &NetworkStats { + &self.stats + } } diff --git a/util/src/network/stats.rs b/util/src/network/stats.rs new file mode 100644 index 000000000..02d904985 --- /dev/null +++ b/util/src/network/stats.rs @@ -0,0 +1,51 @@ +//! Network Statistics +use std::sync::atomic::*; + +/// Network statistics structure +#[derive(Default, Debug)] +pub struct NetworkStats { + /// Bytes received + recv: AtomicUsize, + /// Bytes sent + send: AtomicUsize, + /// Total number of sessions created + sessions: AtomicUsize, +} + +impl NetworkStats { + /// Increase bytes received. + #[inline] + pub fn inc_recv(&self, size: usize) { + self.recv.fetch_add(size, Ordering::Relaxed); + } + + /// Increase bytes sent. + #[inline] + pub fn inc_send(&self, size: usize) { + self.send.fetch_add(size, Ordering::Relaxed); + } + + /// Increase number of sessions. + #[inline] + pub fn inc_sessions(&self) { + self.sessions.fetch_add(1, Ordering::Relaxed); + } + + /// Get bytes sent. + #[inline] + pub fn send(&self) -> usize { + self.send.load(Ordering::Relaxed) + } + + /// Get bytes received. + #[inline] + pub fn recv(&self) -> usize { + self.recv.load(Ordering::Relaxed) + } + + /// Get total number of sessions created. + #[inline] + pub fn sessions(&self) -> usize { + self.sessions.load(Ordering::Relaxed) + } +} diff --git a/util/src/network/tests.rs b/util/src/network/tests.rs new file mode 100644 index 000000000..7b0870532 --- /dev/null +++ b/util/src/network/tests.rs @@ -0,0 +1,103 @@ +use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; +use std::thread; +use std::time::*; +use common::*; +use network::*; +use io::TimerToken; +use crypto::KeyPair; + +pub struct TestProtocol { + pub packet: Mutex, + pub got_timeout: AtomicBool, +} + +impl Default for TestProtocol { + fn default() -> Self { + TestProtocol { + packet: Mutex::new(Vec::new()), + got_timeout: AtomicBool::new(false), + } + } +} + +#[derive(Clone)] +pub struct TestProtocolMessage { + payload: u32, +} + +impl TestProtocol { + /// Creates and register protocol with the network service + pub fn register(service: &mut NetworkService) -> Arc { + let handler = Arc::new(TestProtocol::default()); + service.register_protocol(handler.clone(), "test", &[42u8, 43u8]).expect("Error registering test protocol handler"); + handler + } + + pub fn got_packet(&self) -> bool { + self.packet.lock().unwrap().deref()[..] == b"hello"[..] + } + + pub fn got_timeout(&self) -> bool { + self.got_timeout.load(AtomicOrdering::Relaxed) + } +} + +impl NetworkProtocolHandler for TestProtocol { + fn initialize(&self, io: &NetworkContext) { + io.register_timer(0, 10).unwrap(); + } + + fn read(&self, _io: &NetworkContext, _peer: &PeerId, packet_id: u8, data: &[u8]) { + assert_eq!(packet_id, 33); + self.packet.lock().unwrap().extend(data); + } + + fn connected(&self, io: &NetworkContext, _peer: &PeerId) { + io.respond(33, "hello".to_owned().into_bytes()).unwrap(); + } + + fn disconnected(&self, _io: &NetworkContext, _peer: &PeerId) { + } + + /// Timer function called after a timeout created with `NetworkContext::timeout`. + fn timeout(&self, _io: &NetworkContext, timer: TimerToken) { + assert_eq!(timer, 0); + self.got_timeout.store(true, AtomicOrdering::Relaxed); + } +} + + +#[test] +fn test_net_service() { + let mut service = NetworkService::::start(NetworkConfiguration::new()).expect("Error creating network service"); + service.register_protocol(Arc::new(TestProtocol::default()), "myproto", &[1u8]).unwrap(); +} + +#[test] +fn test_net_connect() { + let key1 = KeyPair::create().unwrap(); + let mut config1 = NetworkConfiguration::new_with_port(30344); + config1.use_secret = Some(key1.secret().clone()); + config1.boot_nodes = Some(vec![ ]); + let mut config2 = NetworkConfiguration::new_with_port(30345); + config2.boot_nodes = Some(vec![ format!("enode://{}@127.0.0.1:30344", key1.public().hex()) ]); + let mut service1 = NetworkService::::start(config1).unwrap(); + let mut service2 = NetworkService::::start(config2).unwrap(); + let handler1 = TestProtocol::register(&mut service1); + let handler2 = TestProtocol::register(&mut service2); + while !handler1.got_packet() && !handler2.got_packet() { + thread::sleep(Duration::from_millis(50)); + } + assert!(service1.stats().sessions() >= 1); + assert!(service2.stats().sessions() >= 1); +} + +#[test] +fn test_net_timeout() { + let config = NetworkConfiguration::new_with_port(30346); + let mut service = NetworkService::::start(config).unwrap(); + let handler = TestProtocol::register(&mut service); + while !handler.got_timeout() { + thread::sleep(Duration::from_millis(50)); + } +}