Incoming connections; Tests

This commit is contained in:
arkpar 2016-01-24 18:53:54 +01:00
parent cbc4828eea
commit cd250d4959
9 changed files with 249 additions and 78 deletions

2
cov.sh
View File

@ -17,5 +17,5 @@ fi
cargo test --no-run || exit $? cargo test --no-run || exit $?
mkdir -p target/coverage mkdir -p target/coverage
kcov --exclude-pattern ~/.multirust --include-pattern src --verify target/coverage target/debug/ethcore* kcov --exclude-pattern ~/.multirust,rocksdb,secp256k1 --include-pattern src --verify target/coverage target/debug/ethcore*
xdg-open target/coverage/index.html xdg-open target/coverage/index.html

View File

@ -33,7 +33,7 @@ fn main() {
let mut net_settings = NetworkConfiguration::new(); let mut net_settings = NetworkConfiguration::new();
let args: Vec<_> = env::args().collect(); let args: Vec<_> = env::args().collect();
if args.len() == 2 { if args.len() == 2 {
net_settings.boot_nodes.push(args[1].trim_matches('\"').to_string()); net_settings.boot_nodes = Some(vec! [args[1].trim_matches('\"').to_string()]);
} }
let mut service = ClientService::start(spec, net_settings).unwrap(); let mut service = ClientService::start(spec, net_settings).unwrap();

View File

@ -1,3 +1,4 @@
use std::sync::Arc;
use std::collections::VecDeque; use std::collections::VecDeque;
use mio::{Handler, Token, EventSet, EventLoop, PollOpt, TryRead, TryWrite}; use mio::{Handler, Token, EventSet, EventLoop, PollOpt, TryRead, TryWrite};
use mio::tcp::*; use mio::tcp::*;
@ -10,6 +11,7 @@ use error::*;
use io::{IoContext, StreamToken}; use io::{IoContext, StreamToken};
use network::error::NetworkError; use network::error::NetworkError;
use network::handshake::Handshake; use network::handshake::Handshake;
use network::stats::NetworkStats;
use crypto; use crypto;
use rcrypto::blockmodes::*; use rcrypto::blockmodes::*;
use rcrypto::aessafe::*; use rcrypto::aessafe::*;
@ -34,6 +36,8 @@ pub struct Connection {
send_queue: VecDeque<Cursor<Bytes>>, send_queue: VecDeque<Cursor<Bytes>>,
/// Event flags this connection expects /// Event flags this connection expects
interest: EventSet, interest: EventSet,
/// Shared network staistics
stats: Arc<NetworkStats>,
} }
/// Connection write status. /// Connection write status.
@ -47,7 +51,7 @@ pub enum WriteStatus {
impl Connection { impl Connection {
/// Create a new connection with given id and socket. /// Create a new connection with given id and socket.
pub fn new(token: StreamToken, socket: TcpStream) -> Connection { pub fn new(token: StreamToken, socket: TcpStream, stats: Arc<NetworkStats>) -> Connection {
Connection { Connection {
token: token, token: token,
socket: socket, socket: socket,
@ -55,6 +59,7 @@ impl Connection {
rec_buf: Bytes::new(), rec_buf: Bytes::new(),
rec_size: 0, rec_size: 0,
interest: EventSet::hup() | EventSet::readable(), interest: EventSet::hup() | EventSet::readable(),
stats: stats,
} }
} }
@ -68,7 +73,6 @@ impl Connection {
} }
/// Readable IO handler. Called when there is some data to be read. /// Readable IO handler. Called when there is some data to be read.
//TODO: return a slice
pub fn readable(&mut self) -> io::Result<Option<Bytes>> { pub fn readable(&mut self) -> io::Result<Option<Bytes>> {
if self.rec_size == 0 || self.rec_buf.len() >= self.rec_size { if self.rec_size == 0 || self.rec_buf.len() >= self.rec_size {
warn!(target:"net", "Unexpected connection read"); warn!(target:"net", "Unexpected connection read");
@ -77,9 +81,12 @@ impl Connection {
// resolve "multiple applicable items in scope [E0034]" error // resolve "multiple applicable items in scope [E0034]" error
let sock_ref = <TcpStream as Read>::by_ref(&mut self.socket); let sock_ref = <TcpStream as Read>::by_ref(&mut self.socket);
match sock_ref.take(max as u64).try_read_buf(&mut self.rec_buf) { match sock_ref.take(max as u64).try_read_buf(&mut self.rec_buf) {
Ok(Some(_)) if self.rec_buf.len() == self.rec_size => { Ok(Some(size)) if size != 0 => {
self.stats.inc_recv(size);
if self.rec_size != 0 && self.rec_buf.len() == self.rec_size {
self.rec_size = 0; self.rec_size = 0;
Ok(Some(::std::mem::replace(&mut self.rec_buf, Bytes::new()))) Ok(Some(::std::mem::replace(&mut self.rec_buf, Bytes::new())))
} else { Ok(None) }
}, },
Ok(_) => Ok(None), Ok(_) => Ok(None),
Err(e) => Err(e), Err(e) => Err(e),
@ -109,14 +116,17 @@ impl Connection {
return Ok(WriteStatus::Complete) return Ok(WriteStatus::Complete)
} }
match self.socket.try_write_buf(buf) { match self.socket.try_write_buf(buf) {
Ok(_) if (buf.position() as usize) < send_size => { Ok(Some(size)) if (buf.position() as usize) < send_size => {
self.interest.insert(EventSet::writable()); self.interest.insert(EventSet::writable());
self.stats.inc_send(size);
Ok(WriteStatus::Ongoing) Ok(WriteStatus::Ongoing)
}, },
Ok(_) if (buf.position() as usize) == send_size => { Ok(Some(size)) if (buf.position() as usize) == send_size => {
self.stats.inc_send(size);
Ok(WriteStatus::Complete) Ok(WriteStatus::Complete)
}, },
Ok(_) => { panic!("Wrote past buffer");}, Ok(Some(_)) => { panic!("Wrote past buffer");},
Ok(None) => Ok(WriteStatus::Ongoing),
Err(e) => Err(e) Err(e) => Err(e)
} }
}.and_then(|r| { }.and_then(|r| {

View File

@ -1,3 +1,4 @@
use std::sync::Arc;
use mio::*; use mio::*;
use mio::tcp::*; use mio::tcp::*;
use hash::*; use hash::*;
@ -10,6 +11,7 @@ use network::host::{HostInfo};
use network::node::NodeId; use network::node::NodeId;
use error::*; use error::*;
use network::error::NetworkError; use network::error::NetworkError;
use network::stats::NetworkStats;
use io::{IoContext, StreamToken}; use io::{IoContext, StreamToken};
#[derive(PartialEq, Eq, Debug)] #[derive(PartialEq, Eq, Debug)]
@ -54,10 +56,10 @@ const HANDSHAKE_TIMEOUT: u64 = 30000;
impl Handshake { impl Handshake {
/// Create a new handshake object /// Create a new handshake object
pub fn new(token: StreamToken, id: &NodeId, socket: TcpStream, nonce: &H256) -> Result<Handshake, UtilError> { pub fn new(token: StreamToken, id: Option<&NodeId>, socket: TcpStream, nonce: &H256, stats: Arc<NetworkStats>) -> Result<Handshake, UtilError> {
Ok(Handshake { Ok(Handshake {
id: id.clone(), id: if let Some(id) = id { id.clone()} else { NodeId::new() },
connection: Connection::new(token, socket), connection: Connection::new(token, socket, stats),
originated: false, originated: false,
state: HandshakeState::New, state: HandshakeState::New,
ecdhe: try!(KeyPair::create()), ecdhe: try!(KeyPair::create()),
@ -143,29 +145,36 @@ impl Handshake {
/// Parse, validate and confirm auth message /// Parse, validate and confirm auth message
fn read_auth(&mut self, host: &HostInfo, data: &[u8]) -> Result<(), UtilError> { fn read_auth(&mut self, host: &HostInfo, data: &[u8]) -> Result<(), UtilError> {
trace!(target:"net", "Received handshake auth to {:?}", self.connection.socket.peer_addr()); trace!(target:"net", "Received handshake auth to {:?}", self.connection.socket.peer_addr());
assert!(data.len() == AUTH_PACKET_SIZE); if data.len() != AUTH_PACKET_SIZE {
debug!(target:"net", "Wrong auth packet size");
return Err(From::from(NetworkError::BadProtocol));
}
self.auth_cipher = data.to_vec(); self.auth_cipher = data.to_vec();
let auth = try!(ecies::decrypt(host.secret(), data)); let auth = try!(ecies::decrypt(host.secret(), data));
let (sig, rest) = auth.split_at(65); let (sig, rest) = auth.split_at(65);
let (hepubk, rest) = rest.split_at(32); let (hepubk, rest) = rest.split_at(32);
let (pubk, rest) = rest.split_at(64); let (pubk, rest) = rest.split_at(64);
let (nonce, _) = rest.split_at(32); let (nonce, _) = rest.split_at(32);
self.remote_public.clone_from_slice(pubk); self.id.clone_from_slice(pubk);
self.remote_nonce.clone_from_slice(nonce); self.remote_nonce.clone_from_slice(nonce);
let shared = try!(ecdh::agree(host.secret(), &self.remote_public)); let shared = try!(ecdh::agree(host.secret(), &self.id));
let signature = Signature::from_slice(sig); let signature = Signature::from_slice(sig);
let spub = try!(ec::recover(&signature, &(&shared ^ &self.remote_nonce))); let spub = try!(ec::recover(&signature, &(&shared ^ &self.remote_nonce)));
self.remote_public = spub.clone();
if &spub.sha3()[..] != hepubk { if &spub.sha3()[..] != hepubk {
trace!(target:"net", "Handshake hash mismath with {:?}", self.connection.socket.peer_addr()); trace!(target:"net", "Handshake hash mismath with {:?}", self.connection.socket.peer_addr());
return Err(From::from(NetworkError::Auth)); return Err(From::from(NetworkError::Auth));
}; };
self.write_ack() Ok(())
} }
/// Parse and validate ack message /// Parse and validate ack message
fn read_ack(&mut self, host: &HostInfo, data: &[u8]) -> Result<(), UtilError> { fn read_ack(&mut self, host: &HostInfo, data: &[u8]) -> Result<(), UtilError> {
trace!(target:"net", "Received handshake auth to {:?}", self.connection.socket.peer_addr()); trace!(target:"net", "Received handshake auth to {:?}", self.connection.socket.peer_addr());
assert!(data.len() == ACK_PACKET_SIZE); if data.len() != ACK_PACKET_SIZE {
debug!(target:"net", "Wrong ack packet size");
return Err(From::from(NetworkError::BadProtocol));
}
self.ack_cipher = data.to_vec(); self.ack_cipher = data.to_vec();
let ack = try!(ecies::decrypt(host.secret(), data)); let ack = try!(ecies::decrypt(host.secret(), data));
self.remote_public.clone_from_slice(&ack[0..64]); self.remote_public.clone_from_slice(&ack[0..64]);

View File

@ -17,6 +17,7 @@ use error::*;
use io::*; use io::*;
use network::NetworkProtocolHandler; use network::NetworkProtocolHandler;
use network::node::*; use network::node::*;
use network::stats::NetworkStats;
type Slab<T> = ::slab::Slab<T, usize>; type Slab<T> = ::slab::Slab<T, usize>;
@ -41,7 +42,9 @@ pub struct NetworkConfiguration {
/// Pin to boot nodes only /// Pin to boot nodes only
pub pin: bool, pub pin: bool,
/// List of initial node addresses /// List of initial node addresses
pub boot_nodes: Vec<String>, pub boot_nodes: Option<Vec<String>>,
/// Use provided node key instead of default
pub use_secret: Option<Secret>,
} }
impl NetworkConfiguration { impl NetworkConfiguration {
@ -53,9 +56,18 @@ impl NetworkConfiguration {
nat_enabled: true, nat_enabled: true,
discovery_enabled: true, discovery_enabled: true,
pin: false, pin: false,
boot_nodes: Vec::new(), boot_nodes: None,
use_secret: None,
} }
} }
/// Create new default configuration with sepcified listen port.
pub fn new_with_port(port: u16) -> NetworkConfiguration {
let mut config = NetworkConfiguration::new();
config.listen_address = SocketAddr::from_str(&format!("0.0.0.0:{}", port)).unwrap();
config.public_address = SocketAddr::from_str(&format!("0.0.0.0:{}", port)).unwrap();
config
}
} }
// Tokens // Tokens
@ -253,6 +265,7 @@ pub struct Host<Message> where Message: Send + Sync + Clone {
handlers: RwLock<HashMap<ProtocolId, Arc<NetworkProtocolHandler<Message>>>>, handlers: RwLock<HashMap<ProtocolId, Arc<NetworkProtocolHandler<Message>>>>,
timers: RwLock<HashMap<TimerToken, ProtocolTimer>>, timers: RwLock<HashMap<TimerToken, ProtocolTimer>>,
timer_counter: RwLock<usize>, timer_counter: RwLock<usize>,
stats: Arc<NetworkStats>,
} }
impl<Message> Host<Message> where Message: Send + Sync + Clone { impl<Message> Host<Message> where Message: Send + Sync + Clone {
@ -264,7 +277,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
let udp_socket = UdpSocket::bound(&addr).unwrap(); let udp_socket = UdpSocket::bound(&addr).unwrap();
let mut host = Host::<Message> { let mut host = Host::<Message> {
info: RwLock::new(HostInfo { info: RwLock::new(HostInfo {
keys: KeyPair::create().unwrap(), keys: if let Some(ref secret) = config.use_secret { KeyPair::from_secret(secret.clone()).unwrap() } else { KeyPair::create().unwrap() },
config: config, config: config,
nonce: H256::random(), nonce: H256::random(),
protocol_version: 4, protocol_version: 4,
@ -279,6 +292,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
handlers: RwLock::new(HashMap::new()), handlers: RwLock::new(HashMap::new()),
timers: RwLock::new(HashMap::new()), timers: RwLock::new(HashMap::new()),
timer_counter: RwLock::new(LAST_CONNECTION + 1), timer_counter: RwLock::new(LAST_CONNECTION + 1),
stats: Arc::new(NetworkStats::default()),
}; };
let port = host.info.read().unwrap().config.listen_address.port(); let port = host.info.read().unwrap().config.listen_address.port();
host.info.write().unwrap().deref_mut().listen_port = port; host.info.write().unwrap().deref_mut().listen_port = port;
@ -290,21 +304,24 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
*/ */
let boot_nodes = host.info.read().unwrap().config.boot_nodes.clone(); let boot_nodes = host.info.read().unwrap().config.boot_nodes.clone();
if boot_nodes.is_empty() { if boot_nodes.is_none() {
// GO bootnodes // GO bootnodes
host.add_node("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@52.16.188.185:30303"); // IE host.add_node("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@52.16.188.185:30303"); // IE
host.add_node("enode://de471bccee3d042261d52e9bff31458daecc406142b401d4cd848f677479f73104b9fdeb090af9583d3391b7f10cb2ba9e26865dd5fca4fcdc0fb1e3b723c786@54.94.239.50:30303"); // BR host.add_node("enode://de471bccee3d042261d52e9bff31458daecc406142b401d4cd848f677479f73104b9fdeb090af9583d3391b7f10cb2ba9e26865dd5fca4fcdc0fb1e3b723c786@54.94.239.50:30303"); // BR
host.add_node("enode://1118980bf48b0a3640bdba04e0fe78b1add18e1cd99bf22d53daac1fd9972ad650df52176e7c7d89d1114cfef2bc23a2959aa54998a46afcf7d91809f0855082@52.74.57.123:30303"); // SG host.add_node("enode://1118980bf48b0a3640bdba04e0fe78b1add18e1cd99bf22d53daac1fd9972ad650df52176e7c7d89d1114cfef2bc23a2959aa54998a46afcf7d91809f0855082@52.74.57.123:30303"); // SG
} }
else { else {
for n in boot_nodes { for n in boot_nodes.unwrap() {
host.add_node(&n); host.add_node(&n);
} }
} }
// ETH/DEV cpp-ethereum (poc-9.ethdev.com)
host host
} }
pub fn stats(&self) -> Arc<NetworkStats> {
self.stats.clone()
}
pub fn add_node(&mut self, id: &str) { pub fn add_node(&mut self, id: &str) {
match Node::from_str(id) { match Node::from_str(id) {
Err(e) => { warn!("Could not add node: {:?}", e); }, Err(e) => { warn!("Could not add node: {:?}", e); },
@ -382,7 +399,6 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
} }
#[allow(single_match)] #[allow(single_match)]
#[allow(block_in_if_condition_stmt)]
fn connect_peer(&self, id: &NodeId, io: &IoContext<NetworkIoMessage<Message>>) { fn connect_peer(&self, id: &NodeId, io: &IoContext<NetworkIoMessage<Message>>) {
if self.have_session(id) if self.have_session(id)
{ {
@ -409,12 +425,16 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
} }
} }
}; };
self.create_connection(socket, Some(id), io);
}
#[allow(block_in_if_condition_stmt)]
fn create_connection(&self, socket: TcpStream, id: Option<&NodeId>, io: &IoContext<NetworkIoMessage<Message>>) {
let nonce = self.info.write().unwrap().next_nonce(); let nonce = self.info.write().unwrap().next_nonce();
let mut connections = self.connections.write().unwrap(); let mut connections = self.connections.write().unwrap();
if connections.insert_with(|token| { if connections.insert_with(|token| {
let mut handshake = Handshake::new(token, id, socket, &nonce).expect("Can't create handshake"); let mut handshake = Handshake::new(token, id, socket, &nonce, self.stats.clone()).expect("Can't create handshake");
handshake.start(io, &self.info.read().unwrap(), true).and_then(|_| io.register_stream(token)).unwrap_or_else (|e| { handshake.start(io, &self.info.read().unwrap(), id.is_some()).and_then(|_| io.register_stream(token)).unwrap_or_else (|e| {
debug!(target: "net", "Handshake create error: {:?}", e); debug!(target: "net", "Handshake create error: {:?}", e);
}); });
Arc::new(Mutex::new(ConnectionEntry::Handshake(handshake))) Arc::new(Mutex::new(ConnectionEntry::Handshake(handshake)))
@ -423,8 +443,20 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
} }
} }
fn accept(&self, _io: &IoContext<NetworkIoMessage<Message>>) { fn accept(&self, io: &IoContext<NetworkIoMessage<Message>>) {
trace!(target: "net", "accept"); trace!(target: "net", "accept");
loop {
let socket = match self.tcp_listener.lock().unwrap().accept() {
Ok(None) => break,
Ok(Some((sock, _addr))) => sock,
Err(e) => {
warn!("Error accepting connection: {:?}", e);
break
},
};
self.create_connection(socket, None, io);
}
io.update_registration(TCP_ACCEPT).expect("Error registering TCP listener");
} }
#[allow(single_match)] #[allow(single_match)]
@ -539,6 +571,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
ConnectionEntry::Handshake(h) => { ConnectionEntry::Handshake(h) => {
let session = Session::new(h, io, &self.info.read().unwrap()).expect("Session creation error"); let session = Session::new(h, io, &self.info.read().unwrap()).expect("Session creation error");
io.update_registration(token).expect("Error updating session registration"); io.update_registration(token).expect("Error updating session registration");
self.stats.inc_sessions();
Some(Arc::new(Mutex::new(ConnectionEntry::Session(session)))) Some(Arc::new(Mutex::new(ConnectionEntry::Session(session))))
}, },
_ => { None } // handshake expired _ => { None } // handshake expired

View File

@ -1,5 +1,4 @@
/// Network and general IO module. /// Network and general IO module.
///
/// Example usage for craeting a network service and adding an IO handler: /// Example usage for craeting a network service and adding an IO handler:
/// ///
/// ```rust /// ```rust
@ -56,22 +55,20 @@ mod discovery;
mod service; mod service;
mod error; mod error;
mod node; mod node;
mod stats;
#[cfg(test)]
mod tests;
/// TODO [arkpar] Please document me
pub use network::host::PeerId; pub use network::host::PeerId;
/// TODO [arkpar] Please document me
pub use network::host::PacketId; pub use network::host::PacketId;
/// TODO [arkpar] Please document me
pub use network::host::NetworkContext; pub use network::host::NetworkContext;
/// TODO [arkpar] Please document me
pub use network::service::NetworkService; pub use network::service::NetworkService;
/// TODO [arkpar] Please document me
pub use network::host::NetworkIoMessage; pub use network::host::NetworkIoMessage;
/// TODO [arkpar] Please document me
pub use network::host::NetworkIoMessage::User as UserMessage; pub use network::host::NetworkIoMessage::User as UserMessage;
/// TODO [arkpar] Please document me
pub use network::error::NetworkError; pub use network::error::NetworkError;
pub use network::host::NetworkConfiguration; pub use network::host::NetworkConfiguration;
pub use network::stats::NetworkStats;
use io::TimerToken; use io::TimerToken;
@ -93,44 +90,3 @@ pub trait NetworkProtocolHandler<Message>: Sync + Send where Message: Send + Syn
fn message(&self, _io: &NetworkContext<Message>, _message: &Message) {} fn message(&self, _io: &NetworkContext<Message>, _message: &Message) {}
} }
#[test]
fn test_net_service() {
use std::sync::Arc;
struct MyHandler;
#[derive(Clone)]
struct MyMessage {
data: u32
}
impl NetworkProtocolHandler<MyMessage> for MyHandler {
fn initialize(&self, io: &NetworkContext<MyMessage>) {
io.register_timer(0, 1000).unwrap();
}
fn read(&self, _io: &NetworkContext<MyMessage>, peer: &PeerId, packet_id: u8, data: &[u8]) {
println!("Received {} ({} bytes) from {}", packet_id, data.len(), peer);
}
fn connected(&self, _io: &NetworkContext<MyMessage>, peer: &PeerId) {
println!("Connected {}", peer);
}
fn disconnected(&self, _io: &NetworkContext<MyMessage>, peer: &PeerId) {
println!("Disconnected {}", peer);
}
fn timeout(&self, _io: &NetworkContext<MyMessage>, timer: TimerToken) {
println!("Timeout {}", timer);
}
fn message(&self, _io: &NetworkContext<MyMessage>, message: &MyMessage) {
println!("Message {}", message.data);
}
}
let mut service = NetworkService::<MyMessage>::start(NetworkConfiguration::new()).expect("Error creating network service");
service.register_protocol(Arc::new(MyHandler), "myproto", &[1u8]).unwrap();
}

View File

@ -3,6 +3,7 @@ use error::*;
use network::{NetworkProtocolHandler, NetworkConfiguration}; use network::{NetworkProtocolHandler, NetworkConfiguration};
use network::error::{NetworkError}; use network::error::{NetworkError};
use network::host::{Host, NetworkIoMessage, ProtocolId}; use network::host::{Host, NetworkIoMessage, ProtocolId};
use network::stats::{NetworkStats};
use io::*; use io::*;
/// IO Service with networking /// IO Service with networking
@ -10,6 +11,7 @@ use io::*;
pub struct NetworkService<Message> where Message: Send + Sync + Clone + 'static { pub struct NetworkService<Message> where Message: Send + Sync + Clone + 'static {
io_service: IoService<NetworkIoMessage<Message>>, io_service: IoService<NetworkIoMessage<Message>>,
host_info: String, host_info: String,
stats: Arc<NetworkStats>
} }
impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'static { impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'static {
@ -17,12 +19,14 @@ impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'stat
pub fn start(config: NetworkConfiguration) -> Result<NetworkService<Message>, UtilError> { pub fn start(config: NetworkConfiguration) -> Result<NetworkService<Message>, UtilError> {
let mut io_service = try!(IoService::<NetworkIoMessage<Message>>::start()); let mut io_service = try!(IoService::<NetworkIoMessage<Message>>::start());
let host = Arc::new(Host::new(config)); let host = Arc::new(Host::new(config));
let stats = host.stats().clone();
let host_info = host.client_version(); let host_info = host.client_version();
info!("NetworkService::start(): id={:?}", host.client_id()); info!("NetworkService::start(): id={:?}", host.client_id());
try!(io_service.register_handler(host)); try!(io_service.register_handler(host));
Ok(NetworkService { Ok(NetworkService {
io_service: io_service, io_service: io_service,
host_info: host_info, host_info: host_info,
stats: stats,
}) })
} }
@ -45,5 +49,10 @@ impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'stat
pub fn io(&mut self) -> &mut IoService<NetworkIoMessage<Message>> { pub fn io(&mut self) -> &mut IoService<NetworkIoMessage<Message>> {
&mut self.io_service &mut self.io_service
} }
/// Returns underlying io service.
pub fn stats(&self) -> &NetworkStats {
&self.stats
}
} }

51
util/src/network/stats.rs Normal file
View File

@ -0,0 +1,51 @@
//! Network Statistics
use std::sync::atomic::*;
/// Network statistics structure
#[derive(Default, Debug)]
pub struct NetworkStats {
/// Bytes received
recv: AtomicUsize,
/// Bytes sent
send: AtomicUsize,
/// Total number of sessions created
sessions: AtomicUsize,
}
impl NetworkStats {
/// Increase bytes received.
#[inline]
pub fn inc_recv(&self, size: usize) {
self.recv.fetch_add(size, Ordering::Relaxed);
}
/// Increase bytes sent.
#[inline]
pub fn inc_send(&self, size: usize) {
self.send.fetch_add(size, Ordering::Relaxed);
}
/// Increase number of sessions.
#[inline]
pub fn inc_sessions(&self) {
self.sessions.fetch_add(1, Ordering::Relaxed);
}
/// Get bytes sent.
#[inline]
pub fn send(&self) -> usize {
self.send.load(Ordering::Relaxed)
}
/// Get bytes received.
#[inline]
pub fn recv(&self) -> usize {
self.recv.load(Ordering::Relaxed)
}
/// Get total number of sessions created.
#[inline]
pub fn sessions(&self) -> usize {
self.sessions.load(Ordering::Relaxed)
}
}

103
util/src/network/tests.rs Normal file
View File

@ -0,0 +1,103 @@
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use std::thread;
use std::time::*;
use common::*;
use network::*;
use io::TimerToken;
use crypto::KeyPair;
pub struct TestProtocol {
pub packet: Mutex<Bytes>,
pub got_timeout: AtomicBool,
}
impl Default for TestProtocol {
fn default() -> Self {
TestProtocol {
packet: Mutex::new(Vec::new()),
got_timeout: AtomicBool::new(false),
}
}
}
#[derive(Clone)]
pub struct TestProtocolMessage {
payload: u32,
}
impl TestProtocol {
/// Creates and register protocol with the network service
pub fn register(service: &mut NetworkService<TestProtocolMessage>) -> Arc<TestProtocol> {
let handler = Arc::new(TestProtocol::default());
service.register_protocol(handler.clone(), "test", &[42u8, 43u8]).expect("Error registering test protocol handler");
handler
}
pub fn got_packet(&self) -> bool {
self.packet.lock().unwrap().deref()[..] == b"hello"[..]
}
pub fn got_timeout(&self) -> bool {
self.got_timeout.load(AtomicOrdering::Relaxed)
}
}
impl NetworkProtocolHandler<TestProtocolMessage> for TestProtocol {
fn initialize(&self, io: &NetworkContext<TestProtocolMessage>) {
io.register_timer(0, 10).unwrap();
}
fn read(&self, _io: &NetworkContext<TestProtocolMessage>, _peer: &PeerId, packet_id: u8, data: &[u8]) {
assert_eq!(packet_id, 33);
self.packet.lock().unwrap().extend(data);
}
fn connected(&self, io: &NetworkContext<TestProtocolMessage>, _peer: &PeerId) {
io.respond(33, "hello".to_owned().into_bytes()).unwrap();
}
fn disconnected(&self, _io: &NetworkContext<TestProtocolMessage>, _peer: &PeerId) {
}
/// Timer function called after a timeout created with `NetworkContext::timeout`.
fn timeout(&self, _io: &NetworkContext<TestProtocolMessage>, timer: TimerToken) {
assert_eq!(timer, 0);
self.got_timeout.store(true, AtomicOrdering::Relaxed);
}
}
#[test]
fn test_net_service() {
let mut service = NetworkService::<TestProtocolMessage>::start(NetworkConfiguration::new()).expect("Error creating network service");
service.register_protocol(Arc::new(TestProtocol::default()), "myproto", &[1u8]).unwrap();
}
#[test]
fn test_net_connect() {
let key1 = KeyPair::create().unwrap();
let mut config1 = NetworkConfiguration::new_with_port(30344);
config1.use_secret = Some(key1.secret().clone());
config1.boot_nodes = Some(vec![ ]);
let mut config2 = NetworkConfiguration::new_with_port(30345);
config2.boot_nodes = Some(vec![ format!("enode://{}@127.0.0.1:30344", key1.public().hex()) ]);
let mut service1 = NetworkService::<TestProtocolMessage>::start(config1).unwrap();
let mut service2 = NetworkService::<TestProtocolMessage>::start(config2).unwrap();
let handler1 = TestProtocol::register(&mut service1);
let handler2 = TestProtocol::register(&mut service2);
while !handler1.got_packet() && !handler2.got_packet() {
thread::sleep(Duration::from_millis(50));
}
assert!(service1.stats().sessions() >= 1);
assert!(service2.stats().sessions() >= 1);
}
#[test]
fn test_net_timeout() {
let config = NetworkConfiguration::new_with_port(30346);
let mut service = NetworkService::<TestProtocolMessage>::start(config).unwrap();
let handler = TestProtocol::register(&mut service);
while !handler.got_timeout() {
thread::sleep(Duration::from_millis(50));
}
}