diff --git a/Cargo.lock b/Cargo.lock index 62093c293..af361da49 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -264,7 +264,7 @@ dependencies = [ "ethminer 1.1.0", "ethsync 1.1.0", "jsonrpc-core 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)", - "jsonrpc-http-server 3.0.1 (registry+https://github.com/rust-lang/crates.io-index)", + "jsonrpc-http-server 4.0.0 (git+https://github.com/tomusdrw/jsonrpc-http-server.git)", "log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-serialize 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)", "serde 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -502,16 +502,6 @@ dependencies = [ "syntex 0.30.0 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "jsonrpc-http-server" -version = "3.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "hyper 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", - "jsonrpc-core 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)", - "unicase 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "jsonrpc-http-server" version = "4.0.0" diff --git a/ethcore/src/block_queue.rs b/ethcore/src/block_queue.rs index d9f629f11..bc1da2f8f 100644 --- a/ethcore/src/block_queue.rs +++ b/ethcore/src/block_queue.rs @@ -442,12 +442,14 @@ impl MayPanic for BlockQueue { impl Drop for BlockQueue { fn drop(&mut self) { + trace!(target: "shutdown", "[BlockQueue] Closing..."); self.clear(); self.deleting.store(true, AtomicOrdering::Release); self.more_to_verify.notify_all(); for t in self.verifiers.drain(..) { t.join().unwrap(); } + trace!(target: "shutdown", "[BlockQueue] Closed."); } } diff --git a/parity/main.rs b/parity/main.rs index c047e5eb3..062d62244 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -45,6 +45,7 @@ extern crate ethcore_rpc as rpc; #[cfg(feature = "webapp")] extern crate ethcore_webapp as webapp; +use std::any::Any; use std::io::{BufRead, BufReader}; use std::fs::File; use std::net::{SocketAddr, IpAddr}; @@ -281,7 +282,7 @@ fn setup_rpc_server( url: &str, cors_domain: &str, apis: Vec<&str> -) -> Arc { +) -> Box { use rpc::v1::*; let server = rpc::RpcServer::new(); @@ -299,8 +300,14 @@ fn setup_rpc_server( } } } - server.start_http(url, cors_domain, ::num_cpus::get()) + let start_result = server.start_http(url, cors_domain, ::num_cpus::get()); + match start_result { + Err(rpc::RpcServerError::IoError(err)) => die_with_io_error(err), + Err(e) => die!("{:?}", e), + Ok(handle) => Box::new(handle), + } } + #[cfg(feature = "webapp")] fn setup_webapp_server( client: Arc, @@ -308,7 +315,7 @@ fn setup_webapp_server( secret_store: Arc, miner: Arc, url: &str -) -> Arc { +) -> Box { use rpc::v1::*; let server = webapp::WebappServer::new(); @@ -317,7 +324,13 @@ fn setup_webapp_server( server.add_delegate(EthClient::new(&client, &sync, &secret_store, &miner).to_delegate()); server.add_delegate(EthFilterClient::new(&client, &miner).to_delegate()); server.add_delegate(PersonalClient::new(&secret_store).to_delegate()); - server.start_http(url, ::num_cpus::get()) + let start_result = server.start_http(url, ::num_cpus::get()); + match start_result { + Err(webapp::WebappServerError::IoError(err)) => die_with_io_error(err), + Err(e) => die!("{:?}", e), + Ok(handle) => Box::new(handle), + } + } #[cfg(not(feature = "rpc"))] @@ -329,7 +342,7 @@ fn setup_rpc_server( _url: &str, _cors_domain: &str, _apis: Vec<&str> -) -> Arc { +) -> ! { die!("Your Parity version has been compiled without JSON-RPC support.") } @@ -340,7 +353,7 @@ fn setup_webapp_server( _secret_store: Arc, _miner: Arc, _url: &str -) -> Arc { +) -> ! { die!("Your Parity version has been compiled without WebApps support.") } @@ -604,7 +617,10 @@ impl Configuration { let account_service = Arc::new(self.account_service()); // Build client - let mut service = ClientService::start(self.client_config(), spec, net_settings, &Path::new(&self.path())).unwrap(); + let mut service = ClientService::start( + self.client_config(), spec, net_settings, &Path::new(&self.path()) + ).unwrap_or_else(|e| die_with_error(e)); + panic_handler.forward_from(&service); let client = service.client(); @@ -619,7 +635,8 @@ impl Configuration { let sync = EthSync::register(service.network(), sync_config, client.clone(), miner.clone()); // Setup rpc - if self.args.flag_jsonrpc || self.args.flag_rpc { + let rpc_server = if self.args.flag_jsonrpc || self.args.flag_rpc { + let apis = self.args.flag_rpcapi.as_ref().unwrap_or(&self.args.flag_jsonrpc_apis); let url = format!("{}:{}", match self.args.flag_rpcaddr.as_ref().unwrap_or(&self.args.flag_jsonrpc_interface).as_str() { "all" => "0.0.0.0", @@ -629,32 +646,33 @@ impl Configuration { self.args.flag_rpcport.unwrap_or(self.args.flag_jsonrpc_port) ); SocketAddr::from_str(&url).unwrap_or_else(|_| die!("{}: Invalid JSONRPC listen host/port given.", url)); - let cors = self.args.flag_rpccorsdomain.as_ref().unwrap_or(&self.args.flag_jsonrpc_cors); - // TODO: use this as the API list. - let apis = self.args.flag_rpcapi.as_ref().unwrap_or(&self.args.flag_jsonrpc_apis); - let handler = setup_rpc_server( - service.client(), - sync.clone(), - account_service.clone(), - miner.clone(), - &url, - cors, - apis.split(',').collect() - ); - panic_handler.forward_from(handler.deref()); - } + let cors_domain = self.args.flag_rpccorsdomain.as_ref().unwrap_or(&self.args.flag_jsonrpc_cors); - if self.args.flag_webapp { - let url = format!("127.0.0.1:{}", self.args.flag_webapp_port); - let handler = setup_webapp_server( + Some(setup_rpc_server( service.client(), sync.clone(), account_service.clone(), miner.clone(), &url, - ); - panic_handler.forward_from(handler.deref()); - } + &cors_domain, + apis.split(',').collect() + )) + } else { + None + }; + + let webapp_server = if self.args.flag_webapp { + let url = format!("127.0.0.1:{}", self.args.flag_webapp_port); + Some(setup_webapp_server( + service.client(), + sync.clone(), + account_service.clone(), + miner.clone(), + &url, + )) + } else { + None + }; // Register IO handler let io_handler = Arc::new(ClientIoHandler { @@ -666,11 +684,11 @@ impl Configuration { service.io().register_handler(io_handler).expect("Error registering IO handler"); // Handle exit - wait_for_exit(panic_handler); + wait_for_exit(panic_handler, rpc_server, webapp_server); } } -fn wait_for_exit(panic_handler: Arc) { +fn wait_for_exit(panic_handler: Arc, _rpc_server: Option>, _webapp_server: Option>) { let exit = Arc::new(Condvar::new()); // Handle possible exits @@ -684,6 +702,30 @@ fn wait_for_exit(panic_handler: Arc) { // Wait for signal let mutex = Mutex::new(()); let _ = exit.wait(mutex.lock().unwrap()).unwrap(); + info!("Finishing work, please wait..."); +} + +fn die_with_error(e: ethcore::error::Error) -> ! { + use ethcore::error::Error; + + match e { + Error::Util(UtilError::StdIo(e)) => die_with_io_error(e), + _ => die!("{:?}", e), + } +} +fn die_with_io_error(e: std::io::Error) -> ! { + match e.kind() { + std::io::ErrorKind::PermissionDenied => { + die!("No permissions to bind to specified port.") + }, + std::io::ErrorKind::AddrInUse => { + die!("Specified address is already in use. Please make sure that nothing is listening on the same port or try using a different one.") + }, + std::io::ErrorKind::AddrNotAvailable => { + die!("Could not use specified interface or given address is invalid.") + }, + _ => die!("{:?}", e), + } } fn main() { diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 17ddf3702..4c62a45c1 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -13,7 +13,7 @@ log = "0.3" serde = "0.7.0" serde_json = "0.7.0" jsonrpc-core = "2.0" -jsonrpc-http-server = "3.0" +jsonrpc-http-server = { git = "https://github.com/tomusdrw/jsonrpc-http-server.git" } ethcore-util = { path = "../util" } ethcore = { path = "../ethcore" } ethash = { path = "../ethash" } diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 3096a45c9..4de405211 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -33,10 +33,9 @@ extern crate ethminer; extern crate transient_hashmap; use std::sync::Arc; -use std::thread; -use util::panics::PanicHandler; use self::jsonrpc_core::{IoHandler, IoDelegate}; +pub use jsonrpc_http_server::{Listening, RpcServerError}; pub mod v1; /// Http server. @@ -45,7 +44,7 @@ pub struct RpcServer { } impl RpcServer { - /// Construct new http server object with given number of threads. + /// Construct new http server object. pub fn new() -> RpcServer { RpcServer { handler: Arc::new(IoHandler::new()), @@ -57,18 +56,12 @@ impl RpcServer { self.handler.add_delegate(delegate); } - /// Start server asynchronously in new thread and returns panic handler. - pub fn start_http(&self, addr: &str, cors_domain: &str, threads: usize) -> Arc { + /// Start server asynchronously and returns result with `Listening` handle on success or an error. + pub fn start_http(&self, addr: &str, cors_domain: &str, threads: usize) -> Result { let addr = addr.to_owned(); let cors_domain = cors_domain.to_owned(); - let panic_handler = PanicHandler::new_in_arc(); - let ph = panic_handler.clone(); let server = jsonrpc_http_server::Server::new(self.handler.clone()); - thread::Builder::new().name("jsonrpc_http".to_string()).spawn(move || { - ph.catch_panic(move || { - server.start(addr.as_ref(), jsonrpc_http_server::AccessControlAllowOrigin::Value(cors_domain), threads); - }).unwrap() - }).expect("Error while creating jsonrpc http thread"); - panic_handler + + server.start(addr.as_ref(), jsonrpc_http_server::AccessControlAllowOrigin::Value(cors_domain), threads) } } diff --git a/util/src/io/service.rs b/util/src/io/service.rs index 8a34ee80a..95aa19e47 100644 --- a/util/src/io/service.rs +++ b/util/src/io/service.rs @@ -376,8 +376,10 @@ impl IoService where Message: Send + Sync + Clone + 'static { impl Drop for IoService where Message: Send + Sync + Clone { fn drop(&mut self) { + trace!(target: "shutdown", "[IoService] Closing..."); self.host_channel.send(IoMessage::Shutdown).unwrap(); self.thread.take().unwrap().join().ok(); + trace!(target: "shutdown", "[IoService] Closed."); } } diff --git a/util/src/io/worker.rs b/util/src/io/worker.rs index b874ea0a4..917b1ad79 100644 --- a/util/src/io/worker.rs +++ b/util/src/io/worker.rs @@ -120,10 +120,12 @@ impl Worker { impl Drop for Worker { fn drop(&mut self) { + trace!(target: "shutdown", "[IoWorker] Closing..."); let _ = self.wait_mutex.lock(); self.deleting.store(true, AtomicOrdering::Release); self.wait.notify_all(); let thread = mem::replace(&mut self.thread, None).unwrap(); thread.join().ok(); + trace!(target: "shutdown", "[IoWorker] Closed"); } } diff --git a/webapp/src/lib.rs b/webapp/src/lib.rs index 7a3620e1a..6150a2b39 100644 --- a/webapp/src/lib.rs +++ b/webapp/src/lib.rs @@ -25,11 +25,9 @@ extern crate iron; extern crate jsonrpc_core; extern crate jsonrpc_http_server; extern crate ethcore_rpc as rpc; -extern crate ethcore_util as util; extern crate parity_webapp; use std::sync::Arc; -use util::panics::PanicHandler; use self::jsonrpc_core::{IoHandler, IoDelegate}; use jsonrpc_http_server::ServerHandler; @@ -55,22 +53,45 @@ impl WebappServer { self.handler.add_delegate(delegate); } - /// Start server asynchronously and returns panic handler. - pub fn start_http(&self, addr: &str, threads: usize) -> Arc { + /// Start server asynchronously and returns result with `Listening` handle on success or an error. + pub fn start_http(&self, addr: &str, threads: usize) -> Result { let addr = addr.to_owned(); - let panic_handler = PanicHandler::new_in_arc(); let handler = self.handler.clone(); let cors_domain = jsonrpc_http_server::AccessControlAllowOrigin::Null; let rpc = ServerHandler::new(handler, cors_domain); let router = router::Router::new(rpc, apps::all_pages()); - panic_handler.catch_panic(move || { - hyper::Server::http(addr.as_ref() as &str).unwrap() - .handle_threads(router, threads) - .unwrap(); - }).unwrap(); - - panic_handler + try!(hyper::Server::http(addr.as_ref() as &str)) + .handle_threads(router, threads) + .map(|l| Listening { listening: l }) + .map_err(WebappServerError::from) + } +} + +/// Listening handle +pub struct Listening { + listening: hyper::server::Listening +} + +impl Drop for Listening { + fn drop(&mut self) { + self.listening.close().unwrap(); + } +} + +/// Webapp Server startup error +#[derive(Debug)] +pub enum WebappServerError { + IoError(std::io::Error), + Other(hyper::error::Error), +} + +impl From for WebappServerError { + fn from(err: hyper::error::Error) -> Self { + match err { + hyper::error::Error::Io(e) => WebappServerError::IoError(e), + e => WebappServerError::Other(e) + } } }