commit
692e5307e1
2
cov.sh
2
cov.sh
@ -17,5 +17,5 @@ fi
|
|||||||
|
|
||||||
cargo test --no-run || exit $?
|
cargo test --no-run || exit $?
|
||||||
mkdir -p target/coverage
|
mkdir -p target/coverage
|
||||||
kcov --exclude-pattern ~/.multirust --include-pattern src --verify target/coverage target/debug/ethcore*
|
kcov --exclude-pattern ~/.multirust,rocksdb,secp256k1 --include-pattern src --verify target/coverage target/debug/ethcore*
|
||||||
xdg-open target/coverage/index.html
|
xdg-open target/coverage/index.html
|
||||||
|
@ -1 +1 @@
|
|||||||
Subproject commit dc86e6359675440aea59ddb48648a01c799925d8
|
Subproject commit e838fd90998fc5502d0b7c9427a4c231f9a6953d
|
@ -61,13 +61,13 @@ fn main() {
|
|||||||
setup_log(&args.flag_logging);
|
setup_log(&args.flag_logging);
|
||||||
|
|
||||||
let spec = ethereum::new_frontier();
|
let spec = ethereum::new_frontier();
|
||||||
|
|
||||||
let init_nodes = match &args.arg_enode {
|
let init_nodes = match &args.arg_enode {
|
||||||
&None => spec.nodes().clone(),
|
&None => spec.nodes().clone(),
|
||||||
&Some(ref enodes) => enodes.clone(),
|
&Some(ref enodes) => enodes.clone(),
|
||||||
};
|
};
|
||||||
|
let mut net_settings = NetworkConfiguration::new();
|
||||||
let mut service = ClientService::start(spec, &init_nodes).unwrap();
|
net_settings.boot_nodes = init_nodes;
|
||||||
|
let mut service = ClientService::start(spec, net_settings).unwrap();
|
||||||
let io_handler = Arc::new(ClientIoHandler { client: service.client(), info: Default::default(), sync: service.sync() });
|
let io_handler = Arc::new(ClientIoHandler { client: service.client(), info: Default::default(), sync: service.sync() });
|
||||||
service.io().register_handler(io_handler).expect("Error registering IO handler");
|
service.io().register_handler(io_handler).expect("Error registering IO handler");
|
||||||
|
|
||||||
|
@ -162,14 +162,10 @@ impl Client {
|
|||||||
let db = Arc::new(DB::open(&opts, state_path.to_str().unwrap()).unwrap());
|
let db = Arc::new(DB::open(&opts, state_path.to_str().unwrap()).unwrap());
|
||||||
|
|
||||||
let engine = Arc::new(try!(spec.to_engine()));
|
let engine = Arc::new(try!(spec.to_engine()));
|
||||||
{
|
let mut state_db = JournalDB::new_with_arc(db.clone());
|
||||||
let mut state_db = JournalDB::new_with_arc(db.clone());
|
if engine.spec().ensure_db_good(&mut state_db) {
|
||||||
if engine.spec().ensure_db_good(&mut state_db) {
|
state_db.commit(0, &engine.spec().genesis_header().hash(), None).expect("Error commiting genesis state to state DB");
|
||||||
state_db.commit(0, &engine.spec().genesis_header().hash(), None).expect("Error commiting genesis state to state DB");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
let state_db = JournalDB::new_with_arc(db);
|
|
||||||
|
|
||||||
Ok(Arc::new(Client {
|
Ok(Arc::new(Client {
|
||||||
chain: chain,
|
chain: chain,
|
||||||
engine: engine.clone(),
|
engine: engine.clone(),
|
||||||
|
@ -26,8 +26,8 @@ pub struct ClientService {
|
|||||||
|
|
||||||
impl ClientService {
|
impl ClientService {
|
||||||
/// Start the service in a separate thread.
|
/// Start the service in a separate thread.
|
||||||
pub fn start(spec: Spec, init_nodes: &[String]) -> Result<ClientService, Error> {
|
pub fn start(spec: Spec, net_config: NetworkConfiguration) -> Result<ClientService, Error> {
|
||||||
let mut net_service = try!(NetworkService::start(init_nodes));
|
let mut net_service = try!(NetworkService::start(net_config));
|
||||||
info!("Starting {}", net_service.host_info());
|
info!("Starting {}", net_service.host_info());
|
||||||
info!("Configured for {} using {} engine", spec.name, spec.engine_name);
|
info!("Configured for {} using {} engine", spec.name, spec.engine_name);
|
||||||
let mut dir = env::home_dir().unwrap();
|
let mut dir = env::home_dir().unwrap();
|
||||||
|
@ -475,6 +475,9 @@ impl ChainSync {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
if max_height != x!(0) {
|
||||||
|
self.sync_peer(io, peer_id, true);
|
||||||
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -74,6 +74,8 @@ pub trait IoHandler<Message>: Send + Sync where Message: Send + Sync + Clone + '
|
|||||||
fn register_stream(&self, _stream: StreamToken, _reg: Token, _event_loop: &mut EventLoop<IoManager<Message>>) {}
|
fn register_stream(&self, _stream: StreamToken, _reg: Token, _event_loop: &mut EventLoop<IoManager<Message>>) {}
|
||||||
/// Re-register a stream with the event loop
|
/// Re-register a stream with the event loop
|
||||||
fn update_stream(&self, _stream: StreamToken, _reg: Token, _event_loop: &mut EventLoop<IoManager<Message>>) {}
|
fn update_stream(&self, _stream: StreamToken, _reg: Token, _event_loop: &mut EventLoop<IoManager<Message>>) {}
|
||||||
|
/// Deregister a stream. Called whenstream is removed from event loop
|
||||||
|
fn deregister_stream(&self, _stream: StreamToken, _event_loop: &mut EventLoop<IoManager<Message>>) {}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// TODO [arkpar] Please document me
|
/// TODO [arkpar] Please document me
|
||||||
|
@ -42,6 +42,10 @@ pub enum IoMessage<Message> where Message: Send + Clone + Sized {
|
|||||||
handler_id: HandlerId,
|
handler_id: HandlerId,
|
||||||
token: StreamToken,
|
token: StreamToken,
|
||||||
},
|
},
|
||||||
|
DeregisterStream {
|
||||||
|
handler_id: HandlerId,
|
||||||
|
token: StreamToken,
|
||||||
|
},
|
||||||
UpdateStreamRegistration {
|
UpdateStreamRegistration {
|
||||||
handler_id: HandlerId,
|
handler_id: HandlerId,
|
||||||
token: StreamToken,
|
token: StreamToken,
|
||||||
@ -83,6 +87,7 @@ impl<Message> IoContext<Message> where Message: Send + Clone + 'static {
|
|||||||
}));
|
}));
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Register a new IO stream.
|
/// Register a new IO stream.
|
||||||
pub fn register_stream(&self, token: StreamToken) -> Result<(), UtilError> {
|
pub fn register_stream(&self, token: StreamToken) -> Result<(), UtilError> {
|
||||||
try!(self.channel.send_io(IoMessage::RegisterStream {
|
try!(self.channel.send_io(IoMessage::RegisterStream {
|
||||||
@ -92,6 +97,15 @@ impl<Message> IoContext<Message> where Message: Send + Clone + 'static {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Deregister an IO stream.
|
||||||
|
pub fn deregister_stream(&self, token: StreamToken) -> Result<(), UtilError> {
|
||||||
|
try!(self.channel.send_io(IoMessage::DeregisterStream {
|
||||||
|
token: token,
|
||||||
|
handler_id: self.handler,
|
||||||
|
}));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Reregister an IO stream.
|
/// Reregister an IO stream.
|
||||||
pub fn update_registration(&self, token: StreamToken) -> Result<(), UtilError> {
|
pub fn update_registration(&self, token: StreamToken) -> Result<(), UtilError> {
|
||||||
try!(self.channel.send_io(IoMessage::UpdateStreamRegistration {
|
try!(self.channel.send_io(IoMessage::UpdateStreamRegistration {
|
||||||
@ -214,6 +228,10 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
|
|||||||
let handler = self.handlers.get(handler_id).expect("Unknown handler id").clone();
|
let handler = self.handlers.get(handler_id).expect("Unknown handler id").clone();
|
||||||
handler.register_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop);
|
handler.register_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop);
|
||||||
},
|
},
|
||||||
|
IoMessage::DeregisterStream { handler_id, token } => {
|
||||||
|
let handler = self.handlers.get(handler_id).expect("Unknown handler id").clone();
|
||||||
|
handler.deregister_stream(token, event_loop);
|
||||||
|
},
|
||||||
IoMessage::UpdateStreamRegistration { handler_id, token } => {
|
IoMessage::UpdateStreamRegistration { handler_id, token } => {
|
||||||
let handler = self.handlers.get(handler_id).expect("Unknown handler id").clone();
|
let handler = self.handlers.get(handler_id).expect("Unknown handler id").clone();
|
||||||
handler.update_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop);
|
handler.update_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop);
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
use mio::{Handler, Token, EventSet, EventLoop, PollOpt, TryRead, TryWrite};
|
use mio::{Handler, Token, EventSet, EventLoop, PollOpt, TryRead, TryWrite};
|
||||||
use mio::tcp::*;
|
use mio::tcp::*;
|
||||||
@ -10,6 +11,7 @@ use error::*;
|
|||||||
use io::{IoContext, StreamToken};
|
use io::{IoContext, StreamToken};
|
||||||
use network::error::NetworkError;
|
use network::error::NetworkError;
|
||||||
use network::handshake::Handshake;
|
use network::handshake::Handshake;
|
||||||
|
use network::stats::NetworkStats;
|
||||||
use crypto;
|
use crypto;
|
||||||
use rcrypto::blockmodes::*;
|
use rcrypto::blockmodes::*;
|
||||||
use rcrypto::aessafe::*;
|
use rcrypto::aessafe::*;
|
||||||
@ -34,6 +36,8 @@ pub struct Connection {
|
|||||||
send_queue: VecDeque<Cursor<Bytes>>,
|
send_queue: VecDeque<Cursor<Bytes>>,
|
||||||
/// Event flags this connection expects
|
/// Event flags this connection expects
|
||||||
interest: EventSet,
|
interest: EventSet,
|
||||||
|
/// Shared network staistics
|
||||||
|
stats: Arc<NetworkStats>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Connection write status.
|
/// Connection write status.
|
||||||
@ -47,7 +51,7 @@ pub enum WriteStatus {
|
|||||||
|
|
||||||
impl Connection {
|
impl Connection {
|
||||||
/// Create a new connection with given id and socket.
|
/// Create a new connection with given id and socket.
|
||||||
pub fn new(token: StreamToken, socket: TcpStream) -> Connection {
|
pub fn new(token: StreamToken, socket: TcpStream, stats: Arc<NetworkStats>) -> Connection {
|
||||||
Connection {
|
Connection {
|
||||||
token: token,
|
token: token,
|
||||||
socket: socket,
|
socket: socket,
|
||||||
@ -55,6 +59,7 @@ impl Connection {
|
|||||||
rec_buf: Bytes::new(),
|
rec_buf: Bytes::new(),
|
||||||
rec_size: 0,
|
rec_size: 0,
|
||||||
interest: EventSet::hup() | EventSet::readable(),
|
interest: EventSet::hup() | EventSet::readable(),
|
||||||
|
stats: stats,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -68,7 +73,6 @@ impl Connection {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Readable IO handler. Called when there is some data to be read.
|
/// Readable IO handler. Called when there is some data to be read.
|
||||||
//TODO: return a slice
|
|
||||||
pub fn readable(&mut self) -> io::Result<Option<Bytes>> {
|
pub fn readable(&mut self) -> io::Result<Option<Bytes>> {
|
||||||
if self.rec_size == 0 || self.rec_buf.len() >= self.rec_size {
|
if self.rec_size == 0 || self.rec_buf.len() >= self.rec_size {
|
||||||
warn!(target:"net", "Unexpected connection read");
|
warn!(target:"net", "Unexpected connection read");
|
||||||
@ -77,9 +81,12 @@ impl Connection {
|
|||||||
// resolve "multiple applicable items in scope [E0034]" error
|
// resolve "multiple applicable items in scope [E0034]" error
|
||||||
let sock_ref = <TcpStream as Read>::by_ref(&mut self.socket);
|
let sock_ref = <TcpStream as Read>::by_ref(&mut self.socket);
|
||||||
match sock_ref.take(max as u64).try_read_buf(&mut self.rec_buf) {
|
match sock_ref.take(max as u64).try_read_buf(&mut self.rec_buf) {
|
||||||
Ok(Some(_)) if self.rec_buf.len() == self.rec_size => {
|
Ok(Some(size)) if size != 0 => {
|
||||||
self.rec_size = 0;
|
self.stats.inc_recv(size);
|
||||||
Ok(Some(::std::mem::replace(&mut self.rec_buf, Bytes::new())))
|
if self.rec_size != 0 && self.rec_buf.len() == self.rec_size {
|
||||||
|
self.rec_size = 0;
|
||||||
|
Ok(Some(::std::mem::replace(&mut self.rec_buf, Bytes::new())))
|
||||||
|
} else { Ok(None) }
|
||||||
},
|
},
|
||||||
Ok(_) => Ok(None),
|
Ok(_) => Ok(None),
|
||||||
Err(e) => Err(e),
|
Err(e) => Err(e),
|
||||||
@ -109,14 +116,17 @@ impl Connection {
|
|||||||
return Ok(WriteStatus::Complete)
|
return Ok(WriteStatus::Complete)
|
||||||
}
|
}
|
||||||
match self.socket.try_write_buf(buf) {
|
match self.socket.try_write_buf(buf) {
|
||||||
Ok(_) if (buf.position() as usize) < send_size => {
|
Ok(Some(size)) if (buf.position() as usize) < send_size => {
|
||||||
self.interest.insert(EventSet::writable());
|
self.interest.insert(EventSet::writable());
|
||||||
|
self.stats.inc_send(size);
|
||||||
Ok(WriteStatus::Ongoing)
|
Ok(WriteStatus::Ongoing)
|
||||||
},
|
},
|
||||||
Ok(_) if (buf.position() as usize) == send_size => {
|
Ok(Some(size)) if (buf.position() as usize) == send_size => {
|
||||||
|
self.stats.inc_send(size);
|
||||||
Ok(WriteStatus::Complete)
|
Ok(WriteStatus::Complete)
|
||||||
},
|
},
|
||||||
Ok(_) => { panic!("Wrote past buffer");},
|
Ok(Some(_)) => { panic!("Wrote past buffer");},
|
||||||
|
Ok(None) => Ok(WriteStatus::Ongoing),
|
||||||
Err(e) => Err(e)
|
Err(e) => Err(e)
|
||||||
}
|
}
|
||||||
}.and_then(|r| {
|
}.and_then(|r| {
|
||||||
@ -137,8 +147,8 @@ impl Connection {
|
|||||||
pub fn register_socket<Host: Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
|
pub fn register_socket<Host: Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
|
||||||
trace!(target: "net", "connection register; token={:?}", reg);
|
trace!(target: "net", "connection register; token={:?}", reg);
|
||||||
event_loop.register(&self.socket, reg, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| {
|
event_loop.register(&self.socket, reg, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| {
|
||||||
error!("Failed to register {:?}, {:?}", reg, e);
|
debug!("Failed to register {:?}, {:?}", reg, e);
|
||||||
Err(e)
|
Ok(())
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -146,10 +156,17 @@ impl Connection {
|
|||||||
pub fn update_socket<Host: Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
|
pub fn update_socket<Host: Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
|
||||||
trace!(target: "net", "connection reregister; token={:?}", reg);
|
trace!(target: "net", "connection reregister; token={:?}", reg);
|
||||||
event_loop.reregister( &self.socket, reg, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| {
|
event_loop.reregister( &self.socket, reg, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| {
|
||||||
error!("Failed to reregister {:?}, {:?}", reg, e);
|
debug!("Failed to reregister {:?}, {:?}", reg, e);
|
||||||
Err(e)
|
Ok(())
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Delete connection registration. Should be called at the end of the IO handler.
|
||||||
|
pub fn deregister_socket<Host: Handler>(&self, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
|
||||||
|
trace!(target: "net", "connection deregister; token={:?}", self.token);
|
||||||
|
event_loop.deregister(&self.socket).ok(); // ignore errors here
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// RLPx packet
|
/// RLPx packet
|
||||||
@ -371,6 +388,12 @@ impl EncryptedConnection {
|
|||||||
try!(self.connection.update_socket(reg, event_loop));
|
try!(self.connection.update_socket(reg, event_loop));
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Delete connection registration. This should be called at the end of the event loop.
|
||||||
|
pub fn deregister_socket<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
|
||||||
|
try!(self.connection.deregister_socket(event_loop));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
use mio::*;
|
use mio::*;
|
||||||
use mio::tcp::*;
|
use mio::tcp::*;
|
||||||
use hash::*;
|
use hash::*;
|
||||||
@ -10,6 +11,7 @@ use network::host::{HostInfo};
|
|||||||
use network::node::NodeId;
|
use network::node::NodeId;
|
||||||
use error::*;
|
use error::*;
|
||||||
use network::error::NetworkError;
|
use network::error::NetworkError;
|
||||||
|
use network::stats::NetworkStats;
|
||||||
use io::{IoContext, StreamToken};
|
use io::{IoContext, StreamToken};
|
||||||
|
|
||||||
#[derive(PartialEq, Eq, Debug)]
|
#[derive(PartialEq, Eq, Debug)]
|
||||||
@ -54,10 +56,10 @@ const HANDSHAKE_TIMEOUT: u64 = 30000;
|
|||||||
|
|
||||||
impl Handshake {
|
impl Handshake {
|
||||||
/// Create a new handshake object
|
/// Create a new handshake object
|
||||||
pub fn new(token: StreamToken, id: &NodeId, socket: TcpStream, nonce: &H256) -> Result<Handshake, UtilError> {
|
pub fn new(token: StreamToken, id: Option<&NodeId>, socket: TcpStream, nonce: &H256, stats: Arc<NetworkStats>) -> Result<Handshake, UtilError> {
|
||||||
Ok(Handshake {
|
Ok(Handshake {
|
||||||
id: id.clone(),
|
id: if let Some(id) = id { id.clone()} else { NodeId::new() },
|
||||||
connection: Connection::new(token, socket),
|
connection: Connection::new(token, socket, stats),
|
||||||
originated: false,
|
originated: false,
|
||||||
state: HandshakeState::New,
|
state: HandshakeState::New,
|
||||||
ecdhe: try!(KeyPair::create()),
|
ecdhe: try!(KeyPair::create()),
|
||||||
@ -134,32 +136,45 @@ impl Handshake {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Delete registration
|
||||||
|
pub fn deregister_socket<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
|
||||||
|
try!(self.connection.deregister_socket(event_loop));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Parse, validate and confirm auth message
|
/// Parse, validate and confirm auth message
|
||||||
fn read_auth(&mut self, host: &HostInfo, data: &[u8]) -> Result<(), UtilError> {
|
fn read_auth(&mut self, host: &HostInfo, data: &[u8]) -> Result<(), UtilError> {
|
||||||
trace!(target:"net", "Received handshake auth to {:?}", self.connection.socket.peer_addr());
|
trace!(target:"net", "Received handshake auth to {:?}", self.connection.socket.peer_addr());
|
||||||
assert!(data.len() == AUTH_PACKET_SIZE);
|
if data.len() != AUTH_PACKET_SIZE {
|
||||||
|
debug!(target:"net", "Wrong auth packet size");
|
||||||
|
return Err(From::from(NetworkError::BadProtocol));
|
||||||
|
}
|
||||||
self.auth_cipher = data.to_vec();
|
self.auth_cipher = data.to_vec();
|
||||||
let auth = try!(ecies::decrypt(host.secret(), data));
|
let auth = try!(ecies::decrypt(host.secret(), data));
|
||||||
let (sig, rest) = auth.split_at(65);
|
let (sig, rest) = auth.split_at(65);
|
||||||
let (hepubk, rest) = rest.split_at(32);
|
let (hepubk, rest) = rest.split_at(32);
|
||||||
let (pubk, rest) = rest.split_at(64);
|
let (pubk, rest) = rest.split_at(64);
|
||||||
let (nonce, _) = rest.split_at(32);
|
let (nonce, _) = rest.split_at(32);
|
||||||
self.remote_public.clone_from_slice(pubk);
|
self.id.clone_from_slice(pubk);
|
||||||
self.remote_nonce.clone_from_slice(nonce);
|
self.remote_nonce.clone_from_slice(nonce);
|
||||||
let shared = try!(ecdh::agree(host.secret(), &self.remote_public));
|
let shared = try!(ecdh::agree(host.secret(), &self.id));
|
||||||
let signature = Signature::from_slice(sig);
|
let signature = Signature::from_slice(sig);
|
||||||
let spub = try!(ec::recover(&signature, &(&shared ^ &self.remote_nonce)));
|
let spub = try!(ec::recover(&signature, &(&shared ^ &self.remote_nonce)));
|
||||||
|
self.remote_public = spub.clone();
|
||||||
if &spub.sha3()[..] != hepubk {
|
if &spub.sha3()[..] != hepubk {
|
||||||
trace!(target:"net", "Handshake hash mismath with {:?}", self.connection.socket.peer_addr());
|
trace!(target:"net", "Handshake hash mismath with {:?}", self.connection.socket.peer_addr());
|
||||||
return Err(From::from(NetworkError::Auth));
|
return Err(From::from(NetworkError::Auth));
|
||||||
};
|
};
|
||||||
self.write_ack()
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse and validate ack message
|
/// Parse and validate ack message
|
||||||
fn read_ack(&mut self, host: &HostInfo, data: &[u8]) -> Result<(), UtilError> {
|
fn read_ack(&mut self, host: &HostInfo, data: &[u8]) -> Result<(), UtilError> {
|
||||||
trace!(target:"net", "Received handshake auth to {:?}", self.connection.socket.peer_addr());
|
trace!(target:"net", "Received handshake auth to {:?}", self.connection.socket.peer_addr());
|
||||||
assert!(data.len() == ACK_PACKET_SIZE);
|
if data.len() != ACK_PACKET_SIZE {
|
||||||
|
debug!(target:"net", "Wrong ack packet size");
|
||||||
|
return Err(From::from(NetworkError::BadProtocol));
|
||||||
|
}
|
||||||
self.ack_cipher = data.to_vec();
|
self.ack_cipher = data.to_vec();
|
||||||
let ack = try!(ecies::decrypt(host.secret(), data));
|
let ack = try!(ecies::decrypt(host.secret(), data));
|
||||||
self.remote_public.clone_from_slice(&ack[0..64]);
|
self.remote_public.clone_from_slice(&ack[0..64]);
|
||||||
|
@ -17,6 +17,7 @@ use error::*;
|
|||||||
use io::*;
|
use io::*;
|
||||||
use network::NetworkProtocolHandler;
|
use network::NetworkProtocolHandler;
|
||||||
use network::node::*;
|
use network::node::*;
|
||||||
|
use network::stats::NetworkStats;
|
||||||
|
|
||||||
type Slab<T> = ::slab::Slab<T, usize>;
|
type Slab<T> = ::slab::Slab<T, usize>;
|
||||||
|
|
||||||
@ -28,24 +29,45 @@ const IDEAL_PEERS: u32 = 10;
|
|||||||
const MAINTENANCE_TIMEOUT: u64 = 1000;
|
const MAINTENANCE_TIMEOUT: u64 = 1000;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct NetworkConfiguration {
|
/// Network service configuration
|
||||||
listen_address: SocketAddr,
|
pub struct NetworkConfiguration {
|
||||||
public_address: SocketAddr,
|
/// IP address to listen for incoming connections
|
||||||
nat_enabled: bool,
|
pub listen_address: SocketAddr,
|
||||||
discovery_enabled: bool,
|
/// IP address to advertise
|
||||||
pin: bool,
|
pub public_address: SocketAddr,
|
||||||
|
/// Enable NAT configuration
|
||||||
|
pub nat_enabled: bool,
|
||||||
|
/// Enable discovery
|
||||||
|
pub discovery_enabled: bool,
|
||||||
|
/// Pin to boot nodes only
|
||||||
|
pub pin: bool,
|
||||||
|
/// List of initial node addresses
|
||||||
|
pub boot_nodes: Vec<String>,
|
||||||
|
/// Use provided node key instead of default
|
||||||
|
pub use_secret: Option<Secret>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl NetworkConfiguration {
|
impl NetworkConfiguration {
|
||||||
fn new() -> NetworkConfiguration {
|
/// Create a new instance of default settings.
|
||||||
|
pub fn new() -> NetworkConfiguration {
|
||||||
NetworkConfiguration {
|
NetworkConfiguration {
|
||||||
listen_address: SocketAddr::from_str("0.0.0.0:30304").unwrap(),
|
listen_address: SocketAddr::from_str("0.0.0.0:30304").unwrap(),
|
||||||
public_address: SocketAddr::from_str("0.0.0.0:30304").unwrap(),
|
public_address: SocketAddr::from_str("0.0.0.0:30304").unwrap(),
|
||||||
nat_enabled: true,
|
nat_enabled: true,
|
||||||
discovery_enabled: true,
|
discovery_enabled: true,
|
||||||
pin: false,
|
pin: false,
|
||||||
|
boot_nodes: Vec::new(),
|
||||||
|
use_secret: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Create new default configuration with sepcified listen port.
|
||||||
|
pub fn new_with_port(port: u16) -> NetworkConfiguration {
|
||||||
|
let mut config = NetworkConfiguration::new();
|
||||||
|
config.listen_address = SocketAddr::from_str(&format!("0.0.0.0:{}", port)).unwrap();
|
||||||
|
config.public_address = SocketAddr::from_str(&format!("0.0.0.0:{}", port)).unwrap();
|
||||||
|
config
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tokens
|
// Tokens
|
||||||
@ -243,18 +265,19 @@ pub struct Host<Message> where Message: Send + Sync + Clone {
|
|||||||
handlers: RwLock<HashMap<ProtocolId, Arc<NetworkProtocolHandler<Message>>>>,
|
handlers: RwLock<HashMap<ProtocolId, Arc<NetworkProtocolHandler<Message>>>>,
|
||||||
timers: RwLock<HashMap<TimerToken, ProtocolTimer>>,
|
timers: RwLock<HashMap<TimerToken, ProtocolTimer>>,
|
||||||
timer_counter: RwLock<usize>,
|
timer_counter: RwLock<usize>,
|
||||||
|
stats: Arc<NetworkStats>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
||||||
pub fn new() -> Host<Message> {
|
/// Create a new instance
|
||||||
let config = NetworkConfiguration::new();
|
pub fn new(config: NetworkConfiguration) -> Host<Message> {
|
||||||
let addr = config.listen_address;
|
let addr = config.listen_address;
|
||||||
// Setup the server socket
|
// Setup the server socket
|
||||||
let tcp_listener = TcpListener::bind(&addr).unwrap();
|
let tcp_listener = TcpListener::bind(&addr).unwrap();
|
||||||
let udp_socket = UdpSocket::bound(&addr).unwrap();
|
let udp_socket = UdpSocket::bound(&addr).unwrap();
|
||||||
let host = Host::<Message> {
|
let mut host = Host::<Message> {
|
||||||
info: RwLock::new(HostInfo {
|
info: RwLock::new(HostInfo {
|
||||||
keys: KeyPair::create().unwrap(),
|
keys: if let Some(ref secret) = config.use_secret { KeyPair::from_secret(secret.clone()).unwrap() } else { KeyPair::create().unwrap() },
|
||||||
config: config,
|
config: config,
|
||||||
nonce: H256::random(),
|
nonce: H256::random(),
|
||||||
protocol_version: 4,
|
protocol_version: 4,
|
||||||
@ -269,6 +292,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
handlers: RwLock::new(HashMap::new()),
|
handlers: RwLock::new(HashMap::new()),
|
||||||
timers: RwLock::new(HashMap::new()),
|
timers: RwLock::new(HashMap::new()),
|
||||||
timer_counter: RwLock::new(LAST_CONNECTION + 1),
|
timer_counter: RwLock::new(LAST_CONNECTION + 1),
|
||||||
|
stats: Arc::new(NetworkStats::default()),
|
||||||
};
|
};
|
||||||
let port = host.info.read().unwrap().config.listen_address.port();
|
let port = host.info.read().unwrap().config.listen_address.port();
|
||||||
host.info.write().unwrap().deref_mut().listen_port = port;
|
host.info.write().unwrap().deref_mut().listen_port = port;
|
||||||
@ -278,9 +302,18 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
Some(iface) => config.public_address = iface.addr.unwrap(),
|
Some(iface) => config.public_address = iface.addr.unwrap(),
|
||||||
None => warn!("No public network interface"),
|
None => warn!("No public network interface"),
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
let boot_nodes = host.info.read().unwrap().config.boot_nodes.clone();
|
||||||
|
for n in boot_nodes {
|
||||||
|
host.add_node(&n);
|
||||||
|
}
|
||||||
host
|
host
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn stats(&self) -> Arc<NetworkStats> {
|
||||||
|
self.stats.clone()
|
||||||
|
}
|
||||||
|
|
||||||
pub fn add_node(&mut self, id: &str) {
|
pub fn add_node(&mut self, id: &str) {
|
||||||
match Node::from_str(id) {
|
match Node::from_str(id) {
|
||||||
Err(e) => { warn!("Could not add node: {:?}", e); },
|
Err(e) => { warn!("Could not add node: {:?}", e); },
|
||||||
@ -358,7 +391,6 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[allow(single_match)]
|
#[allow(single_match)]
|
||||||
#[allow(block_in_if_condition_stmt)]
|
|
||||||
fn connect_peer(&self, id: &NodeId, io: &IoContext<NetworkIoMessage<Message>>) {
|
fn connect_peer(&self, id: &NodeId, io: &IoContext<NetworkIoMessage<Message>>) {
|
||||||
if self.have_session(id)
|
if self.have_session(id)
|
||||||
{
|
{
|
||||||
@ -385,11 +417,16 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
self.create_connection(socket, Some(id), io);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(block_in_if_condition_stmt)]
|
||||||
|
fn create_connection(&self, socket: TcpStream, id: Option<&NodeId>, io: &IoContext<NetworkIoMessage<Message>>) {
|
||||||
let nonce = self.info.write().unwrap().next_nonce();
|
let nonce = self.info.write().unwrap().next_nonce();
|
||||||
if self.connections.write().unwrap().insert_with(|token| {
|
let mut connections = self.connections.write().unwrap();
|
||||||
let mut handshake = Handshake::new(token, id, socket, &nonce).expect("Can't create handshake");
|
if connections.insert_with(|token| {
|
||||||
handshake.start(io, &self.info.read().unwrap(), true).and_then(|_| io.register_stream(token)).unwrap_or_else (|e| {
|
let mut handshake = Handshake::new(token, id, socket, &nonce, self.stats.clone()).expect("Can't create handshake");
|
||||||
|
handshake.start(io, &self.info.read().unwrap(), id.is_some()).and_then(|_| io.register_stream(token)).unwrap_or_else (|e| {
|
||||||
debug!(target: "net", "Handshake create error: {:?}", e);
|
debug!(target: "net", "Handshake create error: {:?}", e);
|
||||||
});
|
});
|
||||||
Arc::new(Mutex::new(ConnectionEntry::Handshake(handshake)))
|
Arc::new(Mutex::new(ConnectionEntry::Handshake(handshake)))
|
||||||
@ -398,8 +435,20 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn accept(&self, _io: &IoContext<NetworkIoMessage<Message>>) {
|
fn accept(&self, io: &IoContext<NetworkIoMessage<Message>>) {
|
||||||
trace!(target: "net", "accept");
|
trace!(target: "net", "accept");
|
||||||
|
loop {
|
||||||
|
let socket = match self.tcp_listener.lock().unwrap().accept() {
|
||||||
|
Ok(None) => break,
|
||||||
|
Ok(Some((sock, _addr))) => sock,
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Error accepting connection: {:?}", e);
|
||||||
|
break
|
||||||
|
},
|
||||||
|
};
|
||||||
|
self.create_connection(socket, None, io);
|
||||||
|
}
|
||||||
|
io.update_registration(TCP_ACCEPT).expect("Error registering TCP listener");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(single_match)]
|
#[allow(single_match)]
|
||||||
@ -508,11 +557,13 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn start_session(&self, token: StreamToken, io: &IoContext<NetworkIoMessage<Message>>) {
|
fn start_session(&self, token: StreamToken, io: &IoContext<NetworkIoMessage<Message>>) {
|
||||||
self.connections.write().unwrap().replace_with(token, |c| {
|
let mut connections = self.connections.write().unwrap();
|
||||||
|
connections.replace_with(token, |c| {
|
||||||
match Arc::try_unwrap(c).ok().unwrap().into_inner().unwrap() {
|
match Arc::try_unwrap(c).ok().unwrap().into_inner().unwrap() {
|
||||||
ConnectionEntry::Handshake(h) => {
|
ConnectionEntry::Handshake(h) => {
|
||||||
let session = Session::new(h, io, &self.info.read().unwrap()).expect("Session creation error");
|
let session = Session::new(h, io, &self.info.read().unwrap()).expect("Session creation error");
|
||||||
io.update_registration(token).expect("Error updating session registration");
|
io.update_registration(token).expect("Error updating session registration");
|
||||||
|
self.stats.inc_sessions();
|
||||||
Some(Arc::new(Mutex::new(ConnectionEntry::Session(session))))
|
Some(Arc::new(Mutex::new(ConnectionEntry::Session(session))))
|
||||||
},
|
},
|
||||||
_ => { None } // handshake expired
|
_ => { None } // handshake expired
|
||||||
@ -544,6 +595,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
_ => {},
|
_ => {},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
io.deregister_stream(token).expect("Error deregistering stream");
|
||||||
}
|
}
|
||||||
for p in to_disconnect {
|
for p in to_disconnect {
|
||||||
let h = self.handlers.read().unwrap().get(p).unwrap().clone();
|
let h = self.handlers.read().unwrap().get(p).unwrap().clone();
|
||||||
@ -656,6 +708,24 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn deregister_stream(&self, stream: StreamToken, event_loop: &mut EventLoop<IoManager<NetworkIoMessage<Message>>>) {
|
||||||
|
match stream {
|
||||||
|
FIRST_CONNECTION ... LAST_CONNECTION => {
|
||||||
|
let mut connections = self.connections.write().unwrap();
|
||||||
|
if let Some(connection) = connections.get(stream).cloned() {
|
||||||
|
match *connection.lock().unwrap().deref() {
|
||||||
|
ConnectionEntry::Handshake(ref h) => h.deregister_socket(event_loop).expect("Error deregistering socket"),
|
||||||
|
ConnectionEntry::Session(ref s) => s.deregister_socket(event_loop).expect("Error deregistering session socket"),
|
||||||
|
}
|
||||||
|
connections.remove(stream);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
NODETABLE_RECEIVE => event_loop.deregister(self.udp_socket.lock().unwrap().deref()).unwrap(),
|
||||||
|
TCP_ACCEPT => event_loop.deregister(self.tcp_listener.lock().unwrap().deref()).unwrap(),
|
||||||
|
_ => warn!("Unexpected stream deregistration")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn update_stream(&self, stream: StreamToken, reg: Token, event_loop: &mut EventLoop<IoManager<NetworkIoMessage<Message>>>) {
|
fn update_stream(&self, stream: StreamToken, reg: Token, event_loop: &mut EventLoop<IoManager<NetworkIoMessage<Message>>>) {
|
||||||
match stream {
|
match stream {
|
||||||
FIRST_CONNECTION ... LAST_CONNECTION => {
|
FIRST_CONNECTION ... LAST_CONNECTION => {
|
||||||
|
@ -1,5 +1,4 @@
|
|||||||
/// Network and general IO module.
|
/// Network and general IO module.
|
||||||
///
|
|
||||||
/// Example usage for craeting a network service and adding an IO handler:
|
/// Example usage for craeting a network service and adding an IO handler:
|
||||||
///
|
///
|
||||||
/// ```rust
|
/// ```rust
|
||||||
@ -40,7 +39,7 @@
|
|||||||
/// }
|
/// }
|
||||||
///
|
///
|
||||||
/// fn main () {
|
/// fn main () {
|
||||||
/// let mut service = NetworkService::<MyMessage>::start().expect("Error creating network service");
|
/// let mut service = NetworkService::<MyMessage>::start(NetworkConfiguration::new()).expect("Error creating network service");
|
||||||
/// service.register_protocol(Arc::new(MyHandler), "myproto", &[1u8]);
|
/// service.register_protocol(Arc::new(MyHandler), "myproto", &[1u8]);
|
||||||
///
|
///
|
||||||
/// // Wait for quit condition
|
/// // Wait for quit condition
|
||||||
@ -56,21 +55,20 @@ mod discovery;
|
|||||||
mod service;
|
mod service;
|
||||||
mod error;
|
mod error;
|
||||||
mod node;
|
mod node;
|
||||||
|
mod stats;
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests;
|
||||||
|
|
||||||
/// TODO [arkpar] Please document me
|
|
||||||
pub use network::host::PeerId;
|
pub use network::host::PeerId;
|
||||||
/// TODO [arkpar] Please document me
|
|
||||||
pub use network::host::PacketId;
|
pub use network::host::PacketId;
|
||||||
/// TODO [arkpar] Please document me
|
|
||||||
pub use network::host::NetworkContext;
|
pub use network::host::NetworkContext;
|
||||||
/// TODO [arkpar] Please document me
|
|
||||||
pub use network::service::NetworkService;
|
pub use network::service::NetworkService;
|
||||||
/// TODO [arkpar] Please document me
|
|
||||||
pub use network::host::NetworkIoMessage;
|
pub use network::host::NetworkIoMessage;
|
||||||
/// TODO [arkpar] Please document me
|
|
||||||
pub use network::host::NetworkIoMessage::User as UserMessage;
|
pub use network::host::NetworkIoMessage::User as UserMessage;
|
||||||
/// TODO [arkpar] Please document me
|
|
||||||
pub use network::error::NetworkError;
|
pub use network::error::NetworkError;
|
||||||
|
pub use network::host::NetworkConfiguration;
|
||||||
|
pub use network::stats::NetworkStats;
|
||||||
|
|
||||||
use io::TimerToken;
|
use io::TimerToken;
|
||||||
|
|
||||||
@ -92,44 +90,3 @@ pub trait NetworkProtocolHandler<Message>: Sync + Send where Message: Send + Syn
|
|||||||
fn message(&self, _io: &NetworkContext<Message>, _message: &Message) {}
|
fn message(&self, _io: &NetworkContext<Message>, _message: &Message) {}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_net_service() {
|
|
||||||
|
|
||||||
use std::sync::Arc;
|
|
||||||
struct MyHandler;
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
struct MyMessage {
|
|
||||||
data: u32
|
|
||||||
}
|
|
||||||
|
|
||||||
impl NetworkProtocolHandler<MyMessage> for MyHandler {
|
|
||||||
fn initialize(&self, io: &NetworkContext<MyMessage>) {
|
|
||||||
io.register_timer(0, 1000).unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
fn read(&self, _io: &NetworkContext<MyMessage>, peer: &PeerId, packet_id: u8, data: &[u8]) {
|
|
||||||
println!("Received {} ({} bytes) from {}", packet_id, data.len(), peer);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn connected(&self, _io: &NetworkContext<MyMessage>, peer: &PeerId) {
|
|
||||||
println!("Connected {}", peer);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn disconnected(&self, _io: &NetworkContext<MyMessage>, peer: &PeerId) {
|
|
||||||
println!("Disconnected {}", peer);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn timeout(&self, _io: &NetworkContext<MyMessage>, timer: TimerToken) {
|
|
||||||
println!("Timeout {}", timer);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn message(&self, _io: &NetworkContext<MyMessage>, message: &MyMessage) {
|
|
||||||
println!("Message {}", message.data);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut service = NetworkService::<MyMessage>::start().expect("Error creating network service");
|
|
||||||
service.register_protocol(Arc::new(MyHandler), "myproto", &[1u8]).unwrap();
|
|
||||||
}
|
|
||||||
|
@ -1,8 +1,9 @@
|
|||||||
use std::sync::*;
|
use std::sync::*;
|
||||||
use error::*;
|
use error::*;
|
||||||
use network::{NetworkProtocolHandler};
|
use network::{NetworkProtocolHandler, NetworkConfiguration};
|
||||||
use network::error::{NetworkError};
|
use network::error::{NetworkError};
|
||||||
use network::host::{Host, NetworkIoMessage, ProtocolId};
|
use network::host::{Host, NetworkIoMessage, ProtocolId};
|
||||||
|
use network::stats::{NetworkStats};
|
||||||
use io::*;
|
use io::*;
|
||||||
|
|
||||||
/// IO Service with networking
|
/// IO Service with networking
|
||||||
@ -10,21 +11,22 @@ use io::*;
|
|||||||
pub struct NetworkService<Message> where Message: Send + Sync + Clone + 'static {
|
pub struct NetworkService<Message> where Message: Send + Sync + Clone + 'static {
|
||||||
io_service: IoService<NetworkIoMessage<Message>>,
|
io_service: IoService<NetworkIoMessage<Message>>,
|
||||||
host_info: String,
|
host_info: String,
|
||||||
|
stats: Arc<NetworkStats>
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'static {
|
impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'static {
|
||||||
/// Starts IO event loop
|
/// Starts IO event loop
|
||||||
pub fn start(init_nodes: &[String]) -> Result<NetworkService<Message>, UtilError> {
|
pub fn start(config: NetworkConfiguration) -> Result<NetworkService<Message>, UtilError> {
|
||||||
let mut io_service = try!(IoService::<NetworkIoMessage<Message>>::start());
|
let mut io_service = try!(IoService::<NetworkIoMessage<Message>>::start());
|
||||||
let mut host = Host::new();
|
let host = Arc::new(Host::new(config));
|
||||||
for n in init_nodes { host.add_node(&n); }
|
let stats = host.stats().clone();
|
||||||
let host = Arc::new(host);
|
|
||||||
let host_info = host.client_version();
|
let host_info = host.client_version();
|
||||||
info!("NetworkService::start(): id={:?}", host.client_id());
|
info!("NetworkService::start(): id={:?}", host.client_id());
|
||||||
try!(io_service.register_handler(host));
|
try!(io_service.register_handler(host));
|
||||||
Ok(NetworkService {
|
Ok(NetworkService {
|
||||||
io_service: io_service,
|
io_service: io_service,
|
||||||
host_info: host_info,
|
host_info: host_info,
|
||||||
|
stats: stats,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -47,5 +49,10 @@ impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'stat
|
|||||||
pub fn io(&mut self) -> &mut IoService<NetworkIoMessage<Message>> {
|
pub fn io(&mut self) -> &mut IoService<NetworkIoMessage<Message>> {
|
||||||
&mut self.io_service
|
&mut self.io_service
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns underlying io service.
|
||||||
|
pub fn stats(&self) -> &NetworkStats {
|
||||||
|
&self.stats
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -131,6 +131,11 @@ impl Session {
|
|||||||
self.connection.update_socket(reg, event_loop)
|
self.connection.update_socket(reg, event_loop)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Delete registration
|
||||||
|
pub fn deregister_socket<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
|
||||||
|
self.connection.deregister_socket(event_loop)
|
||||||
|
}
|
||||||
|
|
||||||
/// Send a protocol packet to peer.
|
/// Send a protocol packet to peer.
|
||||||
pub fn send_packet(&mut self, protocol: &str, packet_id: u8, data: &[u8]) -> Result<(), UtilError> {
|
pub fn send_packet(&mut self, protocol: &str, packet_id: u8, data: &[u8]) -> Result<(), UtilError> {
|
||||||
let mut i = 0usize;
|
let mut i = 0usize;
|
||||||
|
51
util/src/network/stats.rs
Normal file
51
util/src/network/stats.rs
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
//! Network Statistics
|
||||||
|
use std::sync::atomic::*;
|
||||||
|
|
||||||
|
/// Network statistics structure
|
||||||
|
#[derive(Default, Debug)]
|
||||||
|
pub struct NetworkStats {
|
||||||
|
/// Bytes received
|
||||||
|
recv: AtomicUsize,
|
||||||
|
/// Bytes sent
|
||||||
|
send: AtomicUsize,
|
||||||
|
/// Total number of sessions created
|
||||||
|
sessions: AtomicUsize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NetworkStats {
|
||||||
|
/// Increase bytes received.
|
||||||
|
#[inline]
|
||||||
|
pub fn inc_recv(&self, size: usize) {
|
||||||
|
self.recv.fetch_add(size, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Increase bytes sent.
|
||||||
|
#[inline]
|
||||||
|
pub fn inc_send(&self, size: usize) {
|
||||||
|
self.send.fetch_add(size, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Increase number of sessions.
|
||||||
|
#[inline]
|
||||||
|
pub fn inc_sessions(&self) {
|
||||||
|
self.sessions.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get bytes sent.
|
||||||
|
#[inline]
|
||||||
|
pub fn send(&self) -> usize {
|
||||||
|
self.send.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get bytes received.
|
||||||
|
#[inline]
|
||||||
|
pub fn recv(&self) -> usize {
|
||||||
|
self.recv.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get total number of sessions created.
|
||||||
|
#[inline]
|
||||||
|
pub fn sessions(&self) -> usize {
|
||||||
|
self.sessions.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
}
|
103
util/src/network/tests.rs
Normal file
103
util/src/network/tests.rs
Normal file
@ -0,0 +1,103 @@
|
|||||||
|
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
|
||||||
|
use std::thread;
|
||||||
|
use std::time::*;
|
||||||
|
use common::*;
|
||||||
|
use network::*;
|
||||||
|
use io::TimerToken;
|
||||||
|
use crypto::KeyPair;
|
||||||
|
|
||||||
|
pub struct TestProtocol {
|
||||||
|
pub packet: Mutex<Bytes>,
|
||||||
|
pub got_timeout: AtomicBool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for TestProtocol {
|
||||||
|
fn default() -> Self {
|
||||||
|
TestProtocol {
|
||||||
|
packet: Mutex::new(Vec::new()),
|
||||||
|
got_timeout: AtomicBool::new(false),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct TestProtocolMessage {
|
||||||
|
payload: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TestProtocol {
|
||||||
|
/// Creates and register protocol with the network service
|
||||||
|
pub fn register(service: &mut NetworkService<TestProtocolMessage>) -> Arc<TestProtocol> {
|
||||||
|
let handler = Arc::new(TestProtocol::default());
|
||||||
|
service.register_protocol(handler.clone(), "test", &[42u8, 43u8]).expect("Error registering test protocol handler");
|
||||||
|
handler
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn got_packet(&self) -> bool {
|
||||||
|
self.packet.lock().unwrap().deref()[..] == b"hello"[..]
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn got_timeout(&self) -> bool {
|
||||||
|
self.got_timeout.load(AtomicOrdering::Relaxed)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NetworkProtocolHandler<TestProtocolMessage> for TestProtocol {
|
||||||
|
fn initialize(&self, io: &NetworkContext<TestProtocolMessage>) {
|
||||||
|
io.register_timer(0, 10).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read(&self, _io: &NetworkContext<TestProtocolMessage>, _peer: &PeerId, packet_id: u8, data: &[u8]) {
|
||||||
|
assert_eq!(packet_id, 33);
|
||||||
|
self.packet.lock().unwrap().extend(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn connected(&self, io: &NetworkContext<TestProtocolMessage>, _peer: &PeerId) {
|
||||||
|
io.respond(33, "hello".to_owned().into_bytes()).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn disconnected(&self, _io: &NetworkContext<TestProtocolMessage>, _peer: &PeerId) {
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Timer function called after a timeout created with `NetworkContext::timeout`.
|
||||||
|
fn timeout(&self, _io: &NetworkContext<TestProtocolMessage>, timer: TimerToken) {
|
||||||
|
assert_eq!(timer, 0);
|
||||||
|
self.got_timeout.store(true, AtomicOrdering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_net_service() {
|
||||||
|
let mut service = NetworkService::<TestProtocolMessage>::start(NetworkConfiguration::new()).expect("Error creating network service");
|
||||||
|
service.register_protocol(Arc::new(TestProtocol::default()), "myproto", &[1u8]).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_net_connect() {
|
||||||
|
let key1 = KeyPair::create().unwrap();
|
||||||
|
let mut config1 = NetworkConfiguration::new_with_port(30344);
|
||||||
|
config1.use_secret = Some(key1.secret().clone());
|
||||||
|
config1.boot_nodes = Some(vec![ ]);
|
||||||
|
let mut config2 = NetworkConfiguration::new_with_port(30345);
|
||||||
|
config2.boot_nodes = Some(vec![ format!("enode://{}@127.0.0.1:30344", key1.public().hex()) ]);
|
||||||
|
let mut service1 = NetworkService::<TestProtocolMessage>::start(config1).unwrap();
|
||||||
|
let mut service2 = NetworkService::<TestProtocolMessage>::start(config2).unwrap();
|
||||||
|
let handler1 = TestProtocol::register(&mut service1);
|
||||||
|
let handler2 = TestProtocol::register(&mut service2);
|
||||||
|
while !handler1.got_packet() && !handler2.got_packet() {
|
||||||
|
thread::sleep(Duration::from_millis(50));
|
||||||
|
}
|
||||||
|
assert!(service1.stats().sessions() >= 1);
|
||||||
|
assert!(service2.stats().sessions() >= 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_net_timeout() {
|
||||||
|
let config = NetworkConfiguration::new_with_port(30346);
|
||||||
|
let mut service = NetworkService::<TestProtocolMessage>::start(config).unwrap();
|
||||||
|
let handler = TestProtocol::register(&mut service);
|
||||||
|
while !handler.got_timeout() {
|
||||||
|
thread::sleep(Duration::from_millis(50));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user