Merge pull request #893 from ethcore/closing

Additional logging and friendlier error messages
This commit is contained in:
Arkadiy Paronyan 2016-04-07 12:36:19 +02:00
commit 123a0f0b40
7 changed files with 70 additions and 42 deletions

6
Cargo.lock generated
View File

@ -246,7 +246,7 @@ dependencies = [
"ethminer 1.1.0",
"ethsync 1.1.0",
"jsonrpc-core 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"jsonrpc-http-server 3.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"jsonrpc-http-server 4.0.0 (git+https://github.com/tomusdrw/jsonrpc-http-server.git)",
"log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -453,8 +453,8 @@ dependencies = [
[[package]]
name = "jsonrpc-http-server"
version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
version = "4.0.0"
source = "git+https://github.com/tomusdrw/jsonrpc-http-server.git#46bd4e7cf8352e0efc940cf76d3dff99f1a3da15"
dependencies = [
"hyper 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"jsonrpc-core 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -442,12 +442,14 @@ impl MayPanic for BlockQueue {
impl Drop for BlockQueue {
fn drop(&mut self) {
trace!(target: "shutdown", "[BlockQueue] Closing...");
self.clear();
self.deleting.store(true, AtomicOrdering::Release);
self.more_to_verify.notify_all();
for t in self.verifiers.drain(..) {
t.join().unwrap();
}
trace!(target: "shutdown", "[BlockQueue] Closed.");
}
}

View File

@ -270,10 +270,8 @@ fn setup_rpc_server(
sync: Arc<EthSync>,
secret_store: Arc<AccountService>,
miner: Arc<Miner>,
url: &str,
cors_domain: &str,
apis: Vec<&str>
) -> Option<Arc<PanicHandler>> {
) -> Option<rpc::RpcServer> {
use rpc::v1::*;
let server = rpc::RpcServer::new();
@ -291,7 +289,7 @@ fn setup_rpc_server(
}
}
}
Some(server.start_http(url, cors_domain, ::num_cpus::get()))
Some(server)
}
#[cfg(not(feature = "rpc"))]
@ -300,8 +298,6 @@ fn setup_rpc_server(
_sync: Arc<EthSync>,
_secret_store: Arc<AccountService>,
_miner: Arc<Miner>,
_url: &str,
_cors_domain: &str,
_apis: Vec<&str>
) -> Option<Arc<PanicHandler>> {
None
@ -567,7 +563,10 @@ impl Configuration {
let account_service = Arc::new(self.account_service());
// Build client
let mut service = ClientService::start(self.client_config(), spec, net_settings, &Path::new(&self.path())).unwrap();
let mut service = ClientService::start(
self.client_config(), spec, net_settings, &Path::new(&self.path())
).unwrap_or_else(|e| die_with_error(e));
panic_handler.forward_from(&service);
let client = service.client();
@ -582,7 +581,21 @@ impl Configuration {
let sync = EthSync::register(service.network(), sync_config, client.clone(), miner.clone());
// Setup rpc
if self.args.flag_jsonrpc || self.args.flag_rpc {
let rpc_server = if self.args.flag_jsonrpc || self.args.flag_rpc {
// TODO: use this as the API list.
let apis = self.args.flag_rpcapi.as_ref().unwrap_or(&self.args.flag_jsonrpc_apis);
setup_rpc_server(
service.client(),
sync.clone(),
account_service.clone(),
miner.clone(),
apis.split(',').collect()
)
} else {
None
};
let rpc_handle = rpc_server.map(|server| {
let url = format!("{}:{}",
match self.args.flag_rpcaddr.as_ref().unwrap_or(&self.args.flag_jsonrpc_interface).as_str() {
"all" => "0.0.0.0",
@ -592,22 +605,14 @@ impl Configuration {
self.args.flag_rpcport.unwrap_or(self.args.flag_jsonrpc_port)
);
SocketAddr::from_str(&url).unwrap_or_else(|_| die!("{}: Invalid JSONRPC listen host/port given.", url));
let cors = self.args.flag_rpccorsdomain.as_ref().unwrap_or(&self.args.flag_jsonrpc_cors);
// TODO: use this as the API list.
let apis = self.args.flag_rpcapi.as_ref().unwrap_or(&self.args.flag_jsonrpc_apis);
let server_handler = setup_rpc_server(
service.client(),
sync.clone(),
account_service.clone(),
miner.clone(),
&url,
cors,
apis.split(',').collect()
);
if let Some(handler) = server_handler {
panic_handler.forward_from(handler.deref());
let cors_domain = self.args.flag_rpccorsdomain.as_ref().unwrap_or(&self.args.flag_jsonrpc_cors);
let start_result = server.start_http(&url, cors_domain, ::num_cpus::get());
match start_result {
Ok(handle) => handle,
Err(rpc::RpcServerError::IoError(err)) => die_with_io_error(err),
Err(e) => die!("{:?}", e),
}
}
});
// Register IO handler
let io_handler = Arc::new(ClientIoHandler {
@ -619,11 +624,11 @@ impl Configuration {
service.io().register_handler(io_handler).expect("Error registering IO handler");
// Handle exit
wait_for_exit(panic_handler);
wait_for_exit(panic_handler, rpc_handle);
}
}
fn wait_for_exit(panic_handler: Arc<PanicHandler>) {
fn wait_for_exit(panic_handler: Arc<PanicHandler>, _rpc_handle: Option<rpc::Listening>) {
let exit = Arc::new(Condvar::new());
// Handle possible exits
@ -637,6 +642,30 @@ fn wait_for_exit(panic_handler: Arc<PanicHandler>) {
// Wait for signal
let mutex = Mutex::new(());
let _ = exit.wait(mutex.lock().unwrap()).unwrap();
info!("Finishing work, please wait...");
}
fn die_with_error(e: ethcore::error::Error) -> ! {
use ethcore::error::Error;
match e {
Error::Util(UtilError::StdIo(e)) => die_with_io_error(e),
_ => die!("{:?}", e),
}
}
fn die_with_io_error(e: std::io::Error) -> ! {
match e.kind() {
std::io::ErrorKind::PermissionDenied => {
die!("No permissions to bind to specified port.")
},
std::io::ErrorKind::AddrInUse => {
die!("Specified address is already in use. Please make sure that nothing is listening on the same port or try using a different one.")
},
std::io::ErrorKind::AddrNotAvailable => {
die!("Could not use specified interface or given address is invalid.")
},
_ => die!("{:?}", e),
}
}
fn main() {

View File

@ -13,7 +13,7 @@ log = "0.3"
serde = "0.7.0"
serde_json = "0.7.0"
jsonrpc-core = "2.0"
jsonrpc-http-server = "3.0"
jsonrpc-http-server = { git = "https://github.com/tomusdrw/jsonrpc-http-server.git" }
ethcore-util = { path = "../util" }
ethcore = { path = "../ethcore" }
ethash = { path = "../ethash" }

View File

@ -33,10 +33,9 @@ extern crate ethminer;
extern crate transient_hashmap;
use std::sync::Arc;
use std::thread;
use util::panics::PanicHandler;
use self::jsonrpc_core::{IoHandler, IoDelegate};
pub use jsonrpc_http_server::{Listening, RpcServerError};
pub mod v1;
/// Http server.
@ -45,7 +44,7 @@ pub struct RpcServer {
}
impl RpcServer {
/// Construct new http server object with given number of threads.
/// Construct new http server object.
pub fn new() -> RpcServer {
RpcServer {
handler: Arc::new(IoHandler::new()),
@ -57,18 +56,12 @@ impl RpcServer {
self.handler.add_delegate(delegate);
}
/// Start server asynchronously in new thread and returns panic handler.
pub fn start_http(&self, addr: &str, cors_domain: &str, threads: usize) -> Arc<PanicHandler> {
/// Start server asynchronously and returns result with `Listening` handle on success or an error.
pub fn start_http(&self, addr: &str, cors_domain: &str, threads: usize) -> Result<Listening, RpcServerError> {
let addr = addr.to_owned();
let cors_domain = cors_domain.to_owned();
let panic_handler = PanicHandler::new_in_arc();
let ph = panic_handler.clone();
let server = jsonrpc_http_server::Server::new(self.handler.clone());
thread::Builder::new().name("jsonrpc_http".to_string()).spawn(move || {
ph.catch_panic(move || {
server.start(addr.as_ref(), jsonrpc_http_server::AccessControlAllowOrigin::Value(cors_domain), threads);
}).unwrap()
}).expect("Error while creating jsonrpc http thread");
panic_handler
server.start(addr.as_ref(), jsonrpc_http_server::AccessControlAllowOrigin::Value(cors_domain), threads)
}
}

View File

@ -376,8 +376,10 @@ impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static {
impl<Message> Drop for IoService<Message> where Message: Send + Sync + Clone {
fn drop(&mut self) {
trace!(target: "shutdown", "[IoService] Closing...");
self.host_channel.send(IoMessage::Shutdown).unwrap();
self.thread.take().unwrap().join().ok();
trace!(target: "shutdown", "[IoService] Closed.");
}
}

View File

@ -120,10 +120,12 @@ impl Worker {
impl Drop for Worker {
fn drop(&mut self) {
trace!(target: "shutdown", "[IoWorker] Closing...");
let _ = self.wait_mutex.lock();
self.deleting.store(true, AtomicOrdering::Release);
self.wait.notify_all();
let thread = mem::replace(&mut self.thread, None).unwrap();
thread.join().ok();
trace!(target: "shutdown", "[IoWorker] Closed");
}
}