Merge pull request #1313 from ethcore/net

Network start/stop
This commit is contained in:
Arkadiy Paronyan
2016-06-18 11:04:24 +02:00
committed by GitHub
19 changed files with 298 additions and 109 deletions

View File

@@ -18,7 +18,7 @@ use std::net::{SocketAddr};
use std::collections::{HashMap};
use std::str::{FromStr};
use std::sync::*;
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use std::ops::*;
use std::cmp::min;
use std::path::{Path, PathBuf};
@@ -50,7 +50,7 @@ const MAX_HANDSHAKES: usize = 80;
const MAX_HANDSHAKES_PER_ROUND: usize = 32;
const MAINTENANCE_TIMEOUT: u64 = 1000;
#[derive(Debug)]
#[derive(Debug, Clone)]
/// Network service configuration
pub struct NetworkConfiguration {
/// Directory path to store network configuration. None means nothing will be saved
@@ -234,6 +234,11 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
self.io.message(NetworkIoMessage::User(msg));
}
/// Send an IO message
pub fn io_channel(&self) -> IoChannel<NetworkIoMessage<Message>> {
self.io.channel()
}
/// Disable current protocol capability for given peer. If no capabilities left peer gets disconnected.
pub fn disable_peer(&self, peer: PeerId) {
//TODO: remove capability, disconnect if no capabilities left
@@ -245,6 +250,11 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
self.io.message(NetworkIoMessage::Disconnect(peer));
}
/// Sheck if the session is till active.
pub fn is_expired(&self) -> bool {
self.session.as_ref().map(|s| s.lock().unwrap().expired()).unwrap_or(false)
}
/// Register a new IO timer. 'IoHandler::timeout' will be called with the token.
pub fn register_timer(&self, token: TimerToken, ms: u64) -> Result<(), UtilError> {
self.io.message(NetworkIoMessage::AddTimer {
@@ -324,11 +334,12 @@ pub struct Host<Message> where Message: Send + Sync + Clone {
stats: Arc<NetworkStats>,
pinned_nodes: Vec<NodeId>,
num_sessions: AtomicUsize,
stopping: AtomicBool,
}
impl<Message> Host<Message> where Message: Send + Sync + Clone {
/// Create a new instance
pub fn new(config: NetworkConfiguration) -> Result<Host<Message>, UtilError> {
pub fn new(config: NetworkConfiguration, stats: Arc<NetworkStats>) -> Result<Host<Message>, UtilError> {
let mut listen_address = match config.listen_address {
None => SocketAddr::from_str("0.0.0.0:30304").unwrap(),
Some(addr) => addr,
@@ -372,9 +383,10 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
handlers: RwLock::new(HashMap::new()),
timers: RwLock::new(HashMap::new()),
timer_counter: RwLock::new(USER_TIMER),
stats: Arc::new(NetworkStats::default()),
stats: stats,
pinned_nodes: Vec::new(),
num_sessions: AtomicUsize::new(0),
stopping: AtomicBool::new(false),
};
let boot_nodes = host.info.read().unwrap().config.boot_nodes.clone();
@@ -384,10 +396,6 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
Ok(host)
}
pub fn stats(&self) -> Arc<NetworkStats> {
self.stats.clone()
}
pub fn add_node(&mut self, id: &str) {
match Node::from_str(id) {
Err(e) => { debug!(target: "network", "Could not add node {}: {:?}", id, e); },
@@ -402,8 +410,8 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}
}
pub fn client_version(&self) -> String {
self.info.read().unwrap().client_version.clone()
pub fn client_version() -> String {
version()
}
pub fn external_url(&self) -> Option<String> {
@@ -416,6 +424,22 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
r
}
pub fn stop(&self, io: &IoContext<NetworkIoMessage<Message>>) -> Result<(), UtilError> {
self.stopping.store(true, AtomicOrdering::Release);
let mut to_kill = Vec::new();
for e in self.sessions.write().unwrap().iter_mut() {
let mut s = e.lock().unwrap();
s.disconnect(io, DisconnectReason::ClientQuit);
to_kill.push(s.token());
}
for p in to_kill {
trace!(target: "network", "Disconnecting on shutdown: {}", p);
self.kill_connection(p, io, true);
}
try!(io.unregister_handler());
Ok(())
}
fn init_public_interface(&self, io: &IoContext<NetworkIoMessage<Message>>) -> Result<(), UtilError> {
io.clear_timer(INIT_PUBLIC).unwrap();
if self.info.read().unwrap().public_endpoint.is_some() {
@@ -787,6 +811,9 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
}
fn stream_readable(&self, io: &IoContext<NetworkIoMessage<Message>>, stream: StreamToken) {
if self.stopping.load(AtomicOrdering::Acquire) {
return;
}
match stream {
FIRST_SESSION ... LAST_SESSION => self.session_readable(stream, io),
DISCOVERY => {
@@ -802,6 +829,9 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
}
fn stream_writable(&self, io: &IoContext<NetworkIoMessage<Message>>, stream: StreamToken) {
if self.stopping.load(AtomicOrdering::Acquire) {
return;
}
match stream {
FIRST_SESSION ... LAST_SESSION => self.session_writable(stream, io),
DISCOVERY => {
@@ -813,6 +843,9 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
}
fn timeout(&self, io: &IoContext<NetworkIoMessage<Message>>, token: TimerToken) {
if self.stopping.load(AtomicOrdering::Acquire) {
return;
}
match token {
IDLE => self.maintain_network(io),
INIT_PUBLIC => self.init_public_interface(io).unwrap_or_else(|e|
@@ -835,8 +868,8 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
},
_ => match self.timers.read().unwrap().get(&token).cloned() {
Some(timer) => match self.handlers.read().unwrap().get(timer.protocol).cloned() {
None => { warn!(target: "network", "No handler found for protocol: {:?}", timer.protocol) },
Some(h) => { h.timeout(&NetworkContext::new(io, timer.protocol, None, self.sessions.clone()), timer.token); }
None => { warn!(target: "network", "No handler found for protocol: {:?}", timer.protocol) },
Some(h) => { h.timeout(&NetworkContext::new(io, timer.protocol, None, self.sessions.clone()), timer.token); }
},
None => { warn!("Unknown timer token: {}", token); } // timer is not registerd through us
}
@@ -844,6 +877,9 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
}
fn message(&self, io: &IoContext<NetworkIoMessage<Message>>, message: &NetworkIoMessage<Message>) {
if self.stopping.load(AtomicOrdering::Acquire) {
return;
}
match *message {
NetworkIoMessage::AddHandler {
ref handler,
@@ -1009,6 +1045,6 @@ fn host_client_url() {
let mut config = NetworkConfiguration::new();
let key = h256_from_hex("6f7b0d801bc7b5ce7bbd930b84fd0369b3eb25d09be58d64ba811091046f3aa2");
config.use_secret = Some(key);
let host: Host<u32> = Host::new(config).unwrap();
let host: Host<u32> = Host::new(config, Arc::new(NetworkStats::new())).unwrap();
assert!(host.local_url().starts_with("enode://101b3ef5a4ea7a1c7928e24c4c75fd053c235d7b80c22ae5c03d145d0ac7396e2a4ffff9adee3133a7b05044a5cee08115fd65145e5165d646bde371010d803c@"));
}

View File

@@ -56,8 +56,9 @@
//! }
//!
//! fn main () {
//! let mut service = NetworkService::<MyMessage>::start(NetworkConfiguration::new_local()).expect("Error creating network service");
//! let mut service = NetworkService::<MyMessage>::new(NetworkConfiguration::new_local()).expect("Error creating network service");
//! service.register_protocol(Arc::new(MyHandler), "myproto", &[1u8]);
//! service.start().expect("Error starting service");
//!
//! // Wait for quit condition
//! // ...

View File

@@ -28,33 +28,33 @@ use io::*;
pub struct NetworkService<Message> where Message: Send + Sync + Clone + 'static {
io_service: IoService<NetworkIoMessage<Message>>,
host_info: String,
host: Arc<Host<Message>>,
host: RwLock<Option<Arc<Host<Message>>>>,
stats: Arc<NetworkStats>,
panic_handler: Arc<PanicHandler>
panic_handler: Arc<PanicHandler>,
config: NetworkConfiguration,
}
impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'static {
/// Starts IO event loop
pub fn start(config: NetworkConfiguration) -> Result<NetworkService<Message>, UtilError> {
pub fn new(config: NetworkConfiguration) -> Result<NetworkService<Message>, UtilError> {
let panic_handler = PanicHandler::new_in_arc();
let mut io_service = try!(IoService::<NetworkIoMessage<Message>>::start());
let io_service = try!(IoService::<NetworkIoMessage<Message>>::start());
panic_handler.forward_from(&io_service);
let host = Arc::new(try!(Host::new(config)));
let stats = host.stats().clone();
let host_info = host.client_version();
try!(io_service.register_handler(host.clone()));
let stats = Arc::new(NetworkStats::new());
let host_info = Host::<Message>::client_version();
Ok(NetworkService {
io_service: io_service,
host_info: host_info,
stats: stats,
panic_handler: panic_handler,
host: host,
host: RwLock::new(None),
config: config,
})
}
/// Regiter a new protocol handler with the event loop.
pub fn register_protocol(&mut self, handler: Arc<NetworkProtocolHandler<Message>+Send + Sync>, protocol: ProtocolId, versions: &[u8]) -> Result<(), NetworkError> {
pub fn register_protocol(&self, handler: Arc<NetworkProtocolHandler<Message>+Send + Sync>, protocol: ProtocolId, versions: &[u8]) -> Result<(), NetworkError> {
try!(self.io_service.send_message(NetworkIoMessage::AddHandler {
handler: handler,
protocol: protocol,
@@ -69,8 +69,8 @@ impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'stat
}
/// Returns underlying io service.
pub fn io(&mut self) -> &mut IoService<NetworkIoMessage<Message>> {
&mut self.io_service
pub fn io(&self) -> &IoService<NetworkIoMessage<Message>> {
&self.io_service
}
/// Returns network statistics.
@@ -80,12 +80,36 @@ impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'stat
/// Returns external url if available.
pub fn external_url(&self) -> Option<String> {
self.host.external_url()
let host = self.host.read().unwrap();
host.as_ref().and_then(|h| h.external_url())
}
/// Returns external url if available.
pub fn local_url(&self) -> String {
self.host.local_url()
pub fn local_url(&self) -> Option<String> {
let host = self.host.read().unwrap();
host.as_ref().map(|h| h.local_url())
}
/// Start network IO
pub fn start(&self) -> Result<(), UtilError> {
let mut host = self.host.write().unwrap();
if host.is_none() {
let h = Arc::new(try!(Host::new(self.config.clone(), self.stats.clone())));
try!(self.io_service.register_handler(h.clone()));
*host = Some(h);
}
Ok(())
}
/// Stop network IO
pub fn stop(&self) -> Result<(), UtilError> {
let mut host = self.host.write().unwrap();
if let Some(ref host) = *host {
let io = IoContext::new(self.io_service.channel(), 0); //TODO: take token id from host
try!(host.stop(&io));
}
*host = None;
Ok(())
}
}

View File

@@ -65,7 +65,7 @@ impl NetworkStats {
self.sessions.load(Ordering::Relaxed)
}
#[cfg(test)]
/// Create a new empty instance.
pub fn new() -> NetworkStats {
NetworkStats {
recv: AtomicUsize::new(0),

View File

@@ -97,7 +97,8 @@ impl NetworkProtocolHandler<TestProtocolMessage> for TestProtocol {
#[test]
fn net_service() {
let mut service = NetworkService::<TestProtocolMessage>::start(NetworkConfiguration::new_local()).expect("Error creating network service");
let service = NetworkService::<TestProtocolMessage>::new(NetworkConfiguration::new_local()).expect("Error creating network service");
service.start().unwrap();
service.register_protocol(Arc::new(TestProtocol::new(false)), "myproto", &[1u8]).unwrap();
}
@@ -108,12 +109,14 @@ fn net_connect() {
let mut config1 = NetworkConfiguration::new_local();
config1.use_secret = Some(key1.secret().clone());
config1.boot_nodes = vec![ ];
let mut service1 = NetworkService::<TestProtocolMessage>::start(config1).unwrap();
let mut service1 = NetworkService::<TestProtocolMessage>::new(config1).unwrap();
service1.start().unwrap();
let handler1 = TestProtocol::register(&mut service1, false);
let mut config2 = NetworkConfiguration::new_local();
info!("net_connect: local URL: {}", service1.local_url());
config2.boot_nodes = vec![ service1.local_url() ];
let mut service2 = NetworkService::<TestProtocolMessage>::start(config2).unwrap();
info!("net_connect: local URL: {}", service1.local_url().unwrap());
config2.boot_nodes = vec![ service1.local_url().unwrap() ];
let mut service2 = NetworkService::<TestProtocolMessage>::new(config2).unwrap();
service2.start().unwrap();
let handler2 = TestProtocol::register(&mut service2, false);
while !handler1.got_packet() && !handler2.got_packet() && (service1.stats().sessions() == 0 || service2.stats().sessions() == 0) {
thread::sleep(Duration::from_millis(50));
@@ -122,17 +125,28 @@ fn net_connect() {
assert!(service2.stats().sessions() >= 1);
}
#[test]
fn net_start_stop() {
let config = NetworkConfiguration::new_local();
let service = NetworkService::<TestProtocolMessage>::new(config).unwrap();
service.start().unwrap();
service.stop().unwrap();
service.start().unwrap();
}
#[test]
fn net_disconnect() {
let key1 = KeyPair::create().unwrap();
let mut config1 = NetworkConfiguration::new_local();
config1.use_secret = Some(key1.secret().clone());
config1.boot_nodes = vec![ ];
let mut service1 = NetworkService::<TestProtocolMessage>::start(config1).unwrap();
let mut service1 = NetworkService::<TestProtocolMessage>::new(config1).unwrap();
service1.start().unwrap();
let handler1 = TestProtocol::register(&mut service1, false);
let mut config2 = NetworkConfiguration::new_local();
config2.boot_nodes = vec![ service1.local_url() ];
let mut service2 = NetworkService::<TestProtocolMessage>::start(config2).unwrap();
config2.boot_nodes = vec![ service1.local_url().unwrap() ];
let mut service2 = NetworkService::<TestProtocolMessage>::new(config2).unwrap();
service2.start().unwrap();
let handler2 = TestProtocol::register(&mut service2, true);
while !(handler1.got_disconnect() && handler2.got_disconnect()) {
thread::sleep(Duration::from_millis(50));
@@ -144,7 +158,8 @@ fn net_disconnect() {
#[test]
fn net_timeout() {
let config = NetworkConfiguration::new_local();
let mut service = NetworkService::<TestProtocolMessage>::start(config).unwrap();
let mut service = NetworkService::<TestProtocolMessage>::new(config).unwrap();
service.start().unwrap();
let handler = TestProtocol::register(&mut service, false);
while !handler.got_timeout() {
thread::sleep(Duration::from_millis(50));