diff --git a/Cargo.lock b/Cargo.lock index e75e1bc76..8f47ee5cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -164,7 +164,7 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", - "url 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "url 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -278,7 +278,7 @@ dependencies = [ "clippy 0.0.77 (registry+https://github.com/rust-lang/crates.io-index)", "ethcore-rpc 1.3.0", "ethcore-util 1.3.0", - "hyper 0.9.3 (git+https://github.com/ethcore/hyper)", + "hyper 0.9.4 (git+https://github.com/ethcore/hyper)", "jsonrpc-core 2.0.7 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-http-server 5.1.0 (git+https://github.com/ethcore/jsonrpc-http-server.git)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -292,7 +292,7 @@ dependencies = [ "serde_codegen 0.7.9 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "syntex 0.33.0 (registry+https://github.com/rust-lang/crates.io-index)", - "url 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "url 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -366,10 +366,10 @@ dependencies = [ "ethcore-util 1.3.0", "jsonrpc-core 2.0.7 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", - "parity-minimal-sysui 0.1.0 (git+https://github.com/ethcore/parity-dapps-minimal-sysui-rs.git)", + "parity-minimal-sysui 0.2.0 (git+https://github.com/ethcore/parity-dapps-minimal-sysui-rs.git)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", "rustc_version 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", - "ws 0.5.1 (git+https://github.com/ethcore/ws-rs.git)", + "ws 0.5.0 (git+https://github.com/ethcore/ws-rs.git?branch=stable)", ] [[package]] @@ -546,23 +546,22 @@ dependencies = [ [[package]] name = "hyper" -version = "0.9.3" -source = "git+https://github.com/ethcore/hyper#dbb4cf160ebf242f7f0459d547c40e9e6877ccf4" +version = "0.9.4" +source = "git+https://github.com/ethcore/hyper#7ccfcb2aa7e6aa6300efa8cebd6a0e6ce55582ea" dependencies = [ "cookie 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", "httparse 1.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "language-tags 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "mime 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", - "mio 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", - "rotor 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)", + "rotor 0.6.3 (git+https://github.com/ethcore/rotor)", "rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)", "spmc 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", "traitobject 0.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "typeable 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "unicase 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", - "url 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "url 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "vecio 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -630,9 +629,9 @@ dependencies = [ [[package]] name = "jsonrpc-http-server" version = "5.1.0" -source = "git+https://github.com/ethcore/jsonrpc-http-server.git#0c99d308bc15e8fae50642eff77a3e1fd7610652" +source = "git+https://github.com/ethcore/jsonrpc-http-server.git#e59c2fbaca499620874023755cb91f05b858ffe7" dependencies = [ - "hyper 0.9.3 (git+https://github.com/ethcore/hyper)", + "hyper 0.9.4 (git+https://github.com/ethcore/hyper)", "jsonrpc-core 2.0.7 (registry+https://github.com/rust-lang/crates.io-index)", "unicase 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -712,26 +711,10 @@ dependencies = [ "unicase 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "mio" -version = "0.5.0" -source = "git+https://github.com/carllerche/mio.git#f4aa49a9d2c4507fb33a4533d5238e0365f67c99" -dependencies = [ - "bytes 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", - "miow 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", - "net2 0.2.23 (registry+https://github.com/rust-lang/crates.io-index)", - "nix 0.5.0-pre (git+https://github.com/carllerche/nix-rust?rev=c4257f8a76)", - "slab 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", - "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", - "winapi 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "mio" version = "0.5.1" -source = "git+https://github.com/ethcore/mio?branch=v0.5.x#1fc881771fb8c2517317b4f805d7b88235be422b" +source = "git+https://github.com/ethcore/mio?branch=v0.5.x#3842d3b250ffd7bd9b16f9586b875ddcbac2b0dd" dependencies = [ "bytes 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", @@ -801,15 +784,6 @@ dependencies = [ "ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "nix" -version = "0.5.0-pre" -source = "git+https://github.com/carllerche/nix-rust?rev=c4257f8a76#c4257f8a76b69b0d2e9a001d83e4bef67c03b23f" -dependencies = [ - "bitflags 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "nix" version = "0.5.0" @@ -959,8 +933,8 @@ dependencies = [ [[package]] name = "parity-minimal-sysui" -version = "0.1.0" -source = "git+https://github.com/ethcore/parity-dapps-minimal-sysui-rs.git#cc5ea4bd786982f0509a8d3d5deb4217c659af85" +version = "0.2.0" +source = "git+https://github.com/ethcore/parity-dapps-minimal-sysui-rs.git#996c9f3f0ebedc727aecb4ece191154e956ae292" [[package]] name = "phf" @@ -1057,7 +1031,7 @@ dependencies = [ [[package]] name = "quick-error" -version = "0.2.2" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] @@ -1121,12 +1095,11 @@ dependencies = [ [[package]] name = "rotor" version = "0.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" +source = "git+https://github.com/ethcore/rotor#e63d45137b2eb66d1e085a7c6321a5db8b187576" dependencies = [ "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", - "mio 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", - "quick-error 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", - "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.5.1 (git+https://github.com/ethcore/mio?branch=v0.5.x)", + "quick-error 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1408,7 +1381,7 @@ dependencies = [ [[package]] name = "url" -version = "1.0.0" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "idna 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1472,16 +1445,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "ws" -version = "0.5.1" -source = "git+https://github.com/ethcore/ws-rs.git#d5745df8ea1ab82cd2b844f15ca1ac759e7aa9f5" +version = "0.5.0" +source = "git+https://github.com/ethcore/ws-rs.git?branch=stable#e2452450c830618aed30db02e63f3a68710cc40e" dependencies = [ "httparse 1.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", - "mio 0.5.0 (git+https://github.com/carllerche/mio.git)", + "mio 0.5.1 (git+https://github.com/ethcore/mio?branch=v0.5.x)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", "sha1 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", - "slab 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", - "url 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "url 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/dapps/src/endpoint.rs b/dapps/src/endpoint.rs index 592bc7f8f..075211e58 100644 --- a/dapps/src/endpoint.rs +++ b/dapps/src/endpoint.rs @@ -42,11 +42,11 @@ pub struct EndpointInfo { pub trait Endpoint : Send + Sync { fn info(&self) -> Option<&EndpointInfo> { None } - fn to_handler(&self, path: EndpointPath) -> Box>; + fn to_handler(&self, path: EndpointPath) -> Box + Send>; } pub type Endpoints = BTreeMap>; -pub type Handler = server::Handler; +pub type Handler = server::Handler + Send; pub struct ContentHandler { content: String, @@ -65,7 +65,7 @@ impl ContentHandler { } impl server::Handler for ContentHandler { - fn on_request(&mut self, _request: server::Request) -> Next { + fn on_request(&mut self, _request: server::Request) -> Next { Next::write() } diff --git a/dapps/src/lib.rs b/dapps/src/lib.rs index a7fbd5963..22b075afd 100644 --- a/dapps/src/lib.rs +++ b/dapps/src/lib.rs @@ -132,9 +132,16 @@ impl Server { special.clone(), authorization.clone(), )) - .map(|l| Server { - server: Some(l), - panic_handler: panic_handler, + .map(|(l, srv)| { + + ::std::thread::spawn(move || { + srv.run(); + }); + + Server { + server: Some(l), + panic_handler: panic_handler, + } }) .map_err(ServerError::from) } diff --git a/dapps/src/page/handler.rs b/dapps/src/page/handler.rs index 5167c85c8..c3ec354e0 100644 --- a/dapps/src/page/handler.rs +++ b/dapps/src/page/handler.rs @@ -86,7 +86,7 @@ impl PageHandler { } impl server::Handler for PageHandler { - fn on_request(&mut self, req: server::Request) -> Next { + fn on_request(&mut self, req: server::Request) -> Next { self.file = match *req.uri() { RequestUri::AbsolutePath(ref path) => { self.app.file(&self.extract_path(path)) diff --git a/dapps/src/router/auth.rs b/dapps/src/router/auth.rs index d9933b51c..9459d0719 100644 --- a/dapps/src/router/auth.rs +++ b/dapps/src/router/auth.rs @@ -27,13 +27,13 @@ pub enum Authorized { /// Authorization was successful. Yes, /// Unsuccessful authorization. Handler for further work is returned. - No(Box>), + No(Box + Send>), } /// Authorization interface pub trait Authorization : Send + Sync { /// Checks if authorization is valid. - fn is_authorized(&self, req: &server::Request)-> Authorized; + fn is_authorized(&self, req: &server::Request)-> Authorized; } /// HTTP Basic Authorization handler @@ -45,13 +45,13 @@ pub struct HttpBasicAuth { pub struct NoAuth; impl Authorization for NoAuth { - fn is_authorized(&self, _req: &server::Request)-> Authorized { + fn is_authorized(&self, _req: &server::Request)-> Authorized { Authorized::Yes } } impl Authorization for HttpBasicAuth { - fn is_authorized(&self, req: &server::Request) -> Authorized { + fn is_authorized(&self, req: &server::Request) -> Authorized { let auth = self.check_auth(&req); match auth { @@ -89,7 +89,7 @@ impl HttpBasicAuth { self.users.get(&username.to_owned()).map_or(false, |pass| pass == password) } - fn check_auth(&self, req: &server::Request) -> Access { + fn check_auth(&self, req: &server::Request) -> Access { match req.headers().get::>() { Some(&header::Authorization( header::Basic { ref username, password: Some(ref password) } @@ -105,7 +105,7 @@ pub struct UnauthorizedHandler { } impl server::Handler for UnauthorizedHandler { - fn on_request(&mut self, _request: server::Request) -> Next { + fn on_request(&mut self, _request: server::Request) -> Next { Next::write() } @@ -141,7 +141,7 @@ impl server::Handler for UnauthorizedHandler { pub struct AuthRequiredHandler; impl server::Handler for AuthRequiredHandler { - fn on_request(&mut self, _request: server::Request) -> Next { + fn on_request(&mut self, _request: server::Request) -> Next { Next::write() } diff --git a/dapps/src/router/mod.rs b/dapps/src/router/mod.rs index 12738d79d..0cb0d38d0 100644 --- a/dapps/src/router/mod.rs +++ b/dapps/src/router/mod.rs @@ -49,12 +49,12 @@ pub struct Router { endpoints: Arc, special: Arc>>, authorization: Arc, - handler: Box>, + handler: Box + Send>, } impl server::Handler for Router { - fn on_request(&mut self, req: server::Request) -> Next { + fn on_request(&mut self, req: server::Request) -> Next { // Check authorization let auth = self.authorization.is_authorized(&req); @@ -124,7 +124,7 @@ impl Router { } } -fn extract_url(req: &server::Request) -> Option { +fn extract_url(req: &server::Request) -> Option { match *req.uri() { uri::RequestUri::AbsoluteUri(ref url) => { match Url::from_generic_url(url.clone()) { diff --git a/dapps/src/router/redirect.rs b/dapps/src/router/redirect.rs index 6faf9d0c8..6d738115d 100644 --- a/dapps/src/router/redirect.rs +++ b/dapps/src/router/redirect.rs @@ -33,7 +33,7 @@ impl Redirection { } impl server::Handler for Redirection { - fn on_request(&mut self, _request: server::Request) -> Next { + fn on_request(&mut self, _request: server::Request) -> Next { Next::write() } diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index b69558d52..8a294f4fd 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -305,7 +305,7 @@ impl Client where V: Verifier { let closed_block = self.check_and_close_block(&block); if let Err(_) = closed_block { invalid_blocks.insert(header.hash()); - break; + continue; } imported_blocks.push(header.hash()); @@ -368,7 +368,7 @@ impl Client where V: Verifier { invalid: invalid_blocks, enacted: enacted, retracted: retracted, - })).unwrap(); + })).unwrap_or_else(|e| warn!("Error sending IO notification: {:?}", e)); } } diff --git a/signer/Cargo.toml b/signer/Cargo.toml index e7d6dcd27..4ef8bb03e 100644 --- a/signer/Cargo.toml +++ b/signer/Cargo.toml @@ -15,10 +15,10 @@ rand = "0.3.14" jsonrpc-core = "2.0" log = "0.3" env_logger = "0.3" -ws = { git = "https://github.com/ethcore/ws-rs.git" } +ws = { git = "https://github.com/ethcore/ws-rs.git", branch = "stable" } ethcore-util = { path = "../util" } ethcore-rpc = { path = "../rpc" } -parity-minimal-sysui = { git = "https://github.com/ethcore/parity-dapps-minimal-sysui-rs.git" } +parity-minimal-sysui = { git = "https://github.com/ethcore/parity-dapps-minimal-sysui-rs.git", version = "0.2.0" } clippy = { version = "0.0.77", optional = true} diff --git a/signer/src/ws_server/mod.rs b/signer/src/ws_server/mod.rs index b6b7c15f7..d2d838b2c 100644 --- a/signer/src/ws_server/mod.rs +++ b/signer/src/ws_server/mod.rs @@ -110,7 +110,17 @@ impl Server { // Spawn a thread with event loop let handle = thread::spawn(move || { ph.catch_panic(move || { - ws.listen(addr).unwrap() + match ws.listen(addr).map_err(ServerError::from) { + Err(ServerError::IoError(io)) => die(format!( + "Signer: Could not start listening on specified address. Make sure that no other instance is running on Signer's port. Details: {:?}", + io + )), + Err(any_error) => die(format!( + "Signer: Unknown error occured when starting Signer. Details: {:?}", + any_error + )), + Ok(server) => server, + } }).unwrap() }); @@ -123,7 +133,11 @@ impl Server { // TODO [ToDr] Some better structure here for messages. broadcaster.send("new_message").unwrap(); }).expect("It's the only place we are running start_listening. It shouldn't fail."); - broadcaster.shutdown().expect("Broadcaster should close gently.") + let res = broadcaster.shutdown(); + + if let Err(e) = res { + warn!("Signer: Broadcaster was not closed cleanly. Details: {:?}", e); + } }).unwrap() }); @@ -148,5 +162,11 @@ impl Drop for Server { self.queue.finish(); self.broadcaster_handle.take().unwrap().join().unwrap(); self.handle.take().unwrap().join().unwrap(); + } } + +fn die(msg: String) -> ! { + println!("ERROR: {}", msg); + std::process::exit(1); +} diff --git a/signer/src/ws_server/session.rs b/signer/src/ws_server/session.rs index b635f1524..4b2eb808c 100644 --- a/signer/src/ws_server/session.rs +++ b/signer/src/ws_server/session.rs @@ -62,6 +62,19 @@ fn auth_is_valid(codes: &Path, protocols: ws::Result>) -> bool { } } +fn add_headers(mut response: ws::Response, mime: &str) -> ws::Response { + let content_len = format!("{}", response.len()); + { + let mut headers = response.headers_mut(); + headers.push(("X-Frame-Options".into(), b"SAMEORIGIN".to_vec())); + headers.push(("Server".into(), b"Parity/SignerUI".to_vec())); + headers.push(("Content-Length".into(), content_len.as_bytes().to_vec())); + headers.push(("Content-Type".into(), mime.as_bytes().to_vec())); + headers.push(("Connection".into(), b"close".to_vec())); + } + response +} + pub struct Session { out: ws::Sender, self_origin: String, @@ -98,26 +111,13 @@ impl ws::Handler for Session { } // Otherwise try to serve a page. - sysui::handle(req.resource()) + Ok(sysui::handle(req.resource()) .map_or_else( - // return error - || Ok(ws::Response::not_found("Page not found".into())), + // return 404 not found + || add_headers(ws::Response::not_found("Not found".into()), "text/plain"), // or serve the file - |f| { - let content_len = format!("{}", f.content.as_bytes().len()); - let mut res = ws::Response::ok(f.content.into()); - { - let mut headers = res.headers_mut(); - headers.push(("Server".into(), b"Parity/SignerUI".to_vec())); - headers.push(("Connection".into(), b"Closed".to_vec())); - headers.push(("Content-Length".into(), content_len.as_bytes().to_vec())); - headers.push(("Content-Type".into(), f.mime.as_bytes().to_vec())); - if !f.safe_to_embed { - headers.push(("X-Frame-Options".into(), b"SAMEORIGIN".to_vec())); - } - } - Ok(res) - }) + |f| add_headers(ws::Response::ok(f.content.into()), &f.mime) + )) } fn on_message(&mut self, msg: ws::Message) -> ws::Result<()> { diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 3f899c261..462d4783e 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -160,11 +160,13 @@ impl SyncProvider for EthSync { } fn start_network(&self) { - self.io_channel.read().unwrap().send(NetworkIoMessage::User(SyncMessage::StartNetwork)).expect("Error sending IO notification"); + self.io_channel.read().unwrap().send(NetworkIoMessage::User(SyncMessage::StartNetwork)) + .unwrap_or_else(|e| warn!("Error sending IO notification: {:?}", e)); } fn stop_network(&self) { - self.io_channel.read().unwrap().send(NetworkIoMessage::User(SyncMessage::StopNetwork)).expect("Error sending IO notification"); + self.io_channel.read().unwrap().send(NetworkIoMessage::User(SyncMessage::StopNetwork)) + .unwrap_or_else(|e| warn!("Error sending IO notification: {:?}", e)); } } diff --git a/util/src/io/service.rs b/util/src/io/service.rs index 65be55540..07e60f766 100644 --- a/util/src/io/service.rs +++ b/util/src/io/service.rs @@ -135,8 +135,9 @@ impl IoContext where Message: Send + Clone + 'static { } /// Broadcast a message to other IO clients - pub fn message(&self, message: Message) { - self.channel.send(message).expect("Error seding message"); + pub fn message(&self, message: Message) -> Result<(), UtilError> { + try!(self.channel.send(message)); + Ok(()) } /// Get message channel @@ -351,7 +352,9 @@ impl IoService where Message: Send + Sync + Clone + 'static { /// Starts IO event loop pub fn start() -> Result, UtilError> { let panic_handler = PanicHandler::new_in_arc(); - let mut event_loop = EventLoop::new().unwrap(); + let mut config = EventLoopConfig::new(); + config.messages_per_tick(1024); + let mut event_loop = EventLoop::configured(config).expect("Error creating event loop"); let channel = event_loop.channel(); let panic = panic_handler.clone(); let thread = thread::spawn(move || { @@ -390,7 +393,7 @@ impl IoService where Message: Send + Sync + Clone + 'static { impl Drop for IoService where Message: Send + Sync + Clone { fn drop(&mut self) { trace!(target: "shutdown", "[IoService] Closing..."); - self.host_channel.send(IoMessage::Shutdown).unwrap(); + self.host_channel.send(IoMessage::Shutdown).unwrap_or_else(|e| warn!("Error on IO service shutdown: {:?}", e)); self.thread.take().unwrap().join().ok(); trace!(target: "shutdown", "[IoService] Closed."); } diff --git a/util/src/network/discovery.rs b/util/src/network/discovery.rs index b67110538..d4f895702 100644 --- a/util/src/network/discovery.rs +++ b/util/src/network/discovery.rs @@ -28,7 +28,7 @@ use crypto::*; use rlp::*; use network::node_table::*; use network::error::NetworkError; -use io::StreamToken; +use io::{StreamToken, IoContext}; use network::PROTOCOL_VERSION; @@ -283,7 +283,7 @@ impl Discovery { ret } - pub fn writable(&mut self) { + pub fn writable(&mut self, io: &IoContext) where Message: Send + Sync + Clone { while !self.send_queue.is_empty() { let data = self.send_queue.pop_front().unwrap(); match self.udp_socket.send_to(&data.payload, &data.address) { @@ -302,15 +302,17 @@ impl Discovery { } } } + io.update_registration(self.token).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e)); } fn send_to(&mut self, payload: Bytes, address: SocketAddr) { self.send_queue.push_back(Datagramm { payload: payload, address: address }); } - pub fn readable(&mut self) -> Option { + pub fn readable(&mut self, io: &IoContext) -> Option where Message: Send + Sync + Clone { let mut buf: [u8; MAX_DATAGRAM_SIZE] = unsafe { mem::uninitialized() }; - match self.udp_socket.recv_from(&mut buf) { + let writable = !self.send_queue.is_empty(); + let res = match self.udp_socket.recv_from(&mut buf) { Ok(Some((len, address))) => self.on_packet(&buf[0..len], address).unwrap_or_else(|e| { debug!("Error processing UDP packet: {:?}", e); None @@ -320,7 +322,12 @@ impl Discovery { debug!("Error reading UPD socket: {:?}", e); None } + }; + let new_writable = !self.send_queue.is_empty(); + if writable != new_writable { + io.update_registration(self.token).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e)); } + res } fn on_packet(&mut self, packet: &[u8], from: SocketAddr) -> Result, NetworkError> { diff --git a/util/src/network/host.rs b/util/src/network/host.rs index 03f37fa61..46185482f 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -236,8 +236,8 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone } /// Send an IO message - pub fn message(&self, msg: Message) { - self.io.message(NetworkIoMessage::User(msg)); + pub fn message(&self, msg: Message) -> Result<(), UtilError> { + self.io.message(NetworkIoMessage::User(msg)) } /// Get an IoChannel. @@ -248,12 +248,14 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone /// Disable current protocol capability for given peer. If no capabilities left peer gets disconnected. pub fn disable_peer(&self, peer: PeerId) { //TODO: remove capability, disconnect if no capabilities left - self.io.message(NetworkIoMessage::DisablePeer(peer)); + self.io.message(NetworkIoMessage::DisablePeer(peer)) + .unwrap_or_else(|e| warn!("Error sending network IO message: {:?}", e)); } /// Disconnect peer. Reconnect can be attempted later. pub fn disconnect_peer(&self, peer: PeerId) { - self.io.message(NetworkIoMessage::Disconnect(peer)); + self.io.message(NetworkIoMessage::Disconnect(peer)) + .unwrap_or_else(|e| warn!("Error sending network IO message: {:?}", e)); } /// Check if the session is still active. @@ -267,7 +269,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone token: token, delay: ms, protocol: self.protocol, - }); + }).unwrap_or_else(|e| warn!("Error sending network IO message: {:?}", e)); Ok(()) } @@ -714,7 +716,6 @@ impl Host where Message: Send + Sync + Clone { debug!(target: "network", "Can't accept connection: {:?}", e); } } - io.update_registration(TCP_ACCEPT).expect("Error registering TCP listener"); } fn session_writable(&self, token: StreamToken, io: &IoContext>) { @@ -910,11 +911,10 @@ impl IoHandler> for Host where Messa match stream { FIRST_SESSION ... LAST_SESSION => self.session_readable(stream, io), DISCOVERY => { - let node_changes = { self.discovery.lock().unwrap().as_mut().unwrap().readable() }; + let node_changes = { self.discovery.lock().unwrap().as_mut().unwrap().readable(io) }; if let Some(node_changes) = node_changes { self.update_nodes(io, node_changes); } - io.update_registration(DISCOVERY).expect("Error updating discovery registration"); }, TCP_ACCEPT => self.accept(io), _ => panic!("Received unknown readable token"), @@ -928,8 +928,7 @@ impl IoHandler> for Host where Messa match stream { FIRST_SESSION ... LAST_SESSION => self.session_writable(stream, io), DISCOVERY => { - self.discovery.lock().unwrap().as_mut().unwrap().writable(); - io.update_registration(DISCOVERY).expect("Error updating discovery registration"); + self.discovery.lock().unwrap().as_mut().unwrap().writable(io); } _ => panic!("Received unknown writable token"), } @@ -946,14 +945,14 @@ impl IoHandler> for Host where Messa FIRST_SESSION ... LAST_SESSION => self.connection_timeout(token, io), DISCOVERY_REFRESH => { self.discovery.lock().unwrap().as_mut().unwrap().refresh(); - io.update_registration(DISCOVERY).expect("Error updating discovery registration"); + io.update_registration(DISCOVERY).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e)); }, DISCOVERY_ROUND => { let node_changes = { self.discovery.lock().unwrap().as_mut().unwrap().round() }; if let Some(node_changes) = node_changes { self.update_nodes(io, node_changes); } - io.update_registration(DISCOVERY).expect("Error updating discovery registration"); + io.update_registration(DISCOVERY).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e)); }, NODE_TABLE => { trace!(target: "network", "Refreshing node table"); @@ -1004,7 +1003,7 @@ impl IoHandler> for Host where Messa handler_token }; self.timers.write().unwrap().insert(handler_token, ProtocolTimer { protocol: protocol, token: *token }); - io.register_timer(handler_token, *delay).expect("Error registering timer"); + io.register_timer(handler_token, *delay).unwrap_or_else(|e| debug!("Error registering timer {}: {:?}", token, e)); }, NetworkIoMessage::Disconnect(ref peer) => { let session = { self.sessions.read().unwrap().get(*peer).cloned() };