diff --git a/Cargo.lock b/Cargo.lock index 2ebf9b854..11a8c93dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -390,6 +390,7 @@ name = "ethcore-ipc-nano" version = "1.4.0" dependencies = [ "ethcore-ipc 1.4.0", + "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "nanomsg 0.5.1 (git+https://github.com/ethcore/nanomsg.rs.git)", ] diff --git a/db/src/database.rs b/db/src/database.rs index 185618f99..9a52822f6 100644 --- a/db/src/database.rs +++ b/db/src/database.rs @@ -460,7 +460,7 @@ mod client_tests { crossbeam::scope(move |scope| { let stop = Arc::new(AtomicBool::new(false)); run_worker(scope, stop.clone(), url); - let client = nanoipc::init_client::>(url).unwrap(); + let client = nanoipc::generic_client::>(url).unwrap(); client.open_default(path.as_str().to_owned()).unwrap(); client.put("xxx".as_bytes(), "1".as_bytes()).unwrap(); client.close().unwrap(); @@ -477,7 +477,7 @@ mod client_tests { crossbeam::scope(move |scope| { let stop = Arc::new(AtomicBool::new(false)); run_worker(scope, stop.clone(), url); - let client = nanoipc::init_client::>(url).unwrap(); + let client = nanoipc::generic_client::>(url).unwrap(); client.open_default(path.as_str().to_owned()).unwrap(); client.put("xxx".as_bytes(), "1".as_bytes()).unwrap(); @@ -498,7 +498,7 @@ mod client_tests { crossbeam::scope(move |scope| { let stop = Arc::new(AtomicBool::new(false)); run_worker(scope, stop.clone(), url); - let client = nanoipc::init_client::>(url).unwrap(); + let client = nanoipc::generic_client::>(url).unwrap(); client.open_default(path.as_str().to_owned()).unwrap(); assert!(client.get("xxx".as_bytes()).unwrap().is_none()); @@ -516,7 +516,7 @@ mod client_tests { crossbeam::scope(move |scope| { let stop = Arc::new(AtomicBool::new(false)); run_worker(scope, stop.clone(), url); - let client = nanoipc::init_client::>(url).unwrap(); + let client = nanoipc::generic_client::>(url).unwrap(); client.open_default(path.as_str().to_owned()).unwrap(); let transaction = DBTransaction::new(); @@ -541,7 +541,7 @@ mod client_tests { let stop = StopGuard::new(); run_worker(&scope, stop.share(), url); - let client = nanoipc::init_client::>(url).unwrap(); + let client = nanoipc::generic_client::>(url).unwrap(); client.open_default(path.as_str().to_owned()).unwrap(); let mut batch = Vec::new(); diff --git a/db/src/lib.rs.in b/db/src/lib.rs.in index 4fa43b977..54fccb097 100644 --- a/db/src/lib.rs.in +++ b/db/src/lib.rs.in @@ -66,13 +66,13 @@ pub fn extras_service_url(db_path: &str) -> Result { pub fn blocks_client(db_path: &str) -> Result { let url = try!(blocks_service_url(db_path)); - let client = try!(nanoipc::init_client::>(&url)); + let client = try!(nanoipc::generic_client::>(&url)); Ok(client) } pub fn extras_client(db_path: &str) -> Result { let url = try!(extras_service_url(db_path)); - let client = try!(nanoipc::init_client::>(&url)); + let client = try!(nanoipc::generic_client::>(&url)); Ok(client) } diff --git a/ethcore/src/tests/rpc.rs b/ethcore/src/tests/rpc.rs index 202e42988..d5d88c087 100644 --- a/ethcore/src/tests/rpc.rs +++ b/ethcore/src/tests/rpc.rs @@ -56,7 +56,7 @@ fn can_handshake() { let stop_guard = StopGuard::new(); let socket_path = "ipc:///tmp/parity-client-rpc-10.ipc"; run_test_worker(scope, stop_guard.share(), socket_path); - let remote_client = nanoipc::init_client::>(socket_path).unwrap(); + let remote_client = nanoipc::generic_client::>(socket_path).unwrap(); assert!(remote_client.handshake().is_ok()); }) @@ -68,7 +68,7 @@ fn can_query_block() { let stop_guard = StopGuard::new(); let socket_path = "ipc:///tmp/parity-client-rpc-20.ipc"; run_test_worker(scope, stop_guard.share(), socket_path); - let remote_client = nanoipc::init_client::>(socket_path).unwrap(); + let remote_client = nanoipc::generic_client::>(socket_path).unwrap(); let non_existant_block = remote_client.block_header(BlockID::Number(999)); diff --git a/ipc/hypervisor/src/lib.rs b/ipc/hypervisor/src/lib.rs index 3cfd464e9..78b8b04ce 100644 --- a/ipc/hypervisor/src/lib.rs +++ b/ipc/hypervisor/src/lib.rs @@ -240,7 +240,7 @@ mod tests { ::std::thread::spawn(move || { while !hypervisor_ready.load(Ordering::Relaxed) { } - let client = nanoipc::init_client::>(url).unwrap(); + let client = nanoipc::fast_client::>(url).unwrap(); client.handshake().unwrap(); client.module_ready(test_module_id); }); diff --git a/ipc/hypervisor/src/service.rs.in b/ipc/hypervisor/src/service.rs.in index 938cea345..74d289f50 100644 --- a/ipc/hypervisor/src/service.rs.in +++ b/ipc/hypervisor/src/service.rs.in @@ -110,7 +110,7 @@ impl HypervisorService { let modules = self.modules.read().unwrap(); modules.get(&module_id).map(|module| { trace!(target: "hypervisor", "Sending shutdown to {}({})", module_id, &module.control_url); - let client = nanoipc::init_client::>(&module.control_url).unwrap(); + let client = nanoipc::fast_client::>(&module.control_url).unwrap(); client.shutdown(); trace!(target: "hypervisor", "Sent shutdown to {}", module_id); }); diff --git a/ipc/nano/Cargo.toml b/ipc/nano/Cargo.toml index ee399e60f..b358eb23a 100644 --- a/ipc/nano/Cargo.toml +++ b/ipc/nano/Cargo.toml @@ -10,4 +10,4 @@ license = "GPL-3.0" ethcore-ipc = { path = "../rpc" } nanomsg = { git = "https://github.com/ethcore/nanomsg.rs.git" } log = "0.3" - +lazy_static = "0.2" diff --git a/ipc/nano/src/lib.rs b/ipc/nano/src/lib.rs index da48151a6..1157e75d3 100644 --- a/ipc/nano/src/lib.rs +++ b/ipc/nano/src/lib.rs @@ -19,6 +19,7 @@ extern crate ethcore_ipc as ipc; extern crate nanomsg; #[macro_use] extern crate log; +#[macro_use] extern crate lazy_static; pub use ipc::{WithSocket, IpcInterface, IpcConfig}; pub use nanomsg::Socket as NanoSocket; @@ -28,7 +29,8 @@ use nanomsg::{Socket, Protocol, Error, Endpoint, PollRequest, PollFd, PollInOut} use std::ops::Deref; const POLL_TIMEOUT: isize = 200; -const CLIENT_CONNECTION_TIMEOUT: isize = 120000; +const DEFAULT_CONNECTION_TIMEOUT: isize = 30000; +const DEBUG_CONNECTION_TIMEOUT: isize = 5000; /// Generic worker to handle service (binded) sockets pub struct Worker where S: IpcInterface { @@ -68,7 +70,7 @@ pub fn init_duplex_client(socket_addr: &str) -> Result, Sock SocketError::DuplexLink })); - socket.set_receive_timeout(CLIENT_CONNECTION_TIMEOUT).unwrap(); + socket.set_receive_timeout(DEFAULT_CONNECTION_TIMEOUT).unwrap(); let endpoint = try!(socket.connect(socket_addr).map_err(|e| { warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", socket_addr, e); @@ -84,26 +86,58 @@ pub fn init_duplex_client(socket_addr: &str) -> Result, Sock /// Spawns client <`S`> over specified address /// creates socket and connects endpoint to it /// for request-reply connections to the service -pub fn init_client(socket_addr: &str) -> Result, SocketError> where S: WithSocket { +pub fn client(socket_addr: &str, receive_timeout: Option) -> Result, SocketError> where S: WithSocket { let mut socket = try!(Socket::new(Protocol::Req).map_err(|e| { warn!(target: "ipc", "Failed to create ipc socket: {:?}", e); SocketError::RequestLink })); - socket.set_receive_timeout(CLIENT_CONNECTION_TIMEOUT).unwrap(); + if let Some(timeout) = receive_timeout { + socket.set_receive_timeout(timeout).unwrap(); + } let endpoint = try!(socket.connect(socket_addr).map_err(|e| { warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", socket_addr, e); SocketError::RequestLink })); - trace!(target: "ipc", "Created cleint for {}", socket_addr); + trace!(target: "ipc", "Created client for {}", socket_addr); Ok(GuardedSocket { client: Arc::new(S::init(socket)), _endpoint: endpoint, }) } +lazy_static! { + /// Set PARITY_IPC_DEBUG=1 for fail-fast connectivity problems diagnostic + pub static ref DEBUG_FLAG: bool = { + use std::env; + + if let Ok(debug) = env::var("PARITY_IPC_DEBUG") { + debug == "1" || debug.to_uppercase() == "TRUE" + } + else { false } + }; +} + +/// Client with no default timeout on operations +pub fn generic_client(socket_addr: &str) -> Result, SocketError> where S: WithSocket { + if *DEBUG_FLAG { + client(socket_addr, Some(DEBUG_CONNECTION_TIMEOUT)) + } else { + client(socket_addr, None) + } +} + +/// Client over interface that is supposed to give quick almost non-blocking responses +pub fn fast_client(socket_addr: &str) -> Result, SocketError> where S: WithSocket { + if *DEBUG_FLAG { + client(socket_addr, Some(DEBUG_CONNECTION_TIMEOUT)) + } else { + client(socket_addr, Some(DEFAULT_CONNECTION_TIMEOUT)) + } +} + /// Error occurred while establising socket or endpoint #[derive(Debug)] pub enum SocketError { diff --git a/parity/boot.rs b/parity/boot.rs index aa0e4b82b..0b0e6b670 100644 --- a/parity/boot.rs +++ b/parity/boot.rs @@ -63,7 +63,7 @@ pub fn payload() -> Result { } pub fn register(hv_url: &str, control_url: &str, module_id: IpcModuleId) -> GuardedSocket>{ - let hypervisor_client = nanoipc::init_client::>(hv_url).unwrap(); + let hypervisor_client = nanoipc::fast_client::>(hv_url).unwrap(); hypervisor_client.handshake().unwrap(); hypervisor_client.module_ready(module_id, control_url.to_owned()); @@ -73,7 +73,7 @@ pub fn register(hv_url: &str, control_url: &str, module_id: IpcModuleId) -> Guar pub fn dependency>(url: &str) -> Result, BootError> { - nanoipc::init_client::(url).map_err(|socket_err| BootError::DependencyConnect(socket_err)) + nanoipc::generic_client::(url).map_err(|socket_err| BootError::DependencyConnect(socket_err)) } pub fn main_thread() -> Arc { diff --git a/parity/io_handler.rs b/parity/io_handler.rs index d60f80f9a..8386c2d1b 100644 --- a/parity/io_handler.rs +++ b/parity/io_handler.rs @@ -15,6 +15,7 @@ // along with Parity. If not, see . use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; use ethcore::client::Client; use ethcore::service::ClientIoMessage; use ethsync::{SyncProvider, ManageNetwork}; @@ -31,6 +32,7 @@ pub struct ClientIoHandler { pub net: Arc, pub accounts: Arc, pub info: Arc, + pub shutdown: Arc } impl IoHandler for ClientIoHandler { @@ -39,7 +41,7 @@ impl IoHandler for ClientIoHandler { } fn timeout(&self, _io: &IoContext, timer: TimerToken) { - if let INFO_TIMER = timer { + if timer == INFO_TIMER && !self.shutdown.load(Ordering::SeqCst) { self.info.tick(); } } diff --git a/parity/modules.rs b/parity/modules.rs index 73de6ca29..53cef4741 100644 --- a/parity/modules.rs +++ b/parity/modules.rs @@ -71,7 +71,7 @@ mod ipc_deps { pub use ethsync::{SyncClient, NetworkManagerClient, ServiceConfiguration}; pub use ethcore::client::ChainNotifyClient; pub use hypervisor::{SYNC_MODULE_ID, BootArgs, HYPERVISOR_IPC_URL}; - pub use nanoipc::{GuardedSocket, NanoSocket, init_client}; + pub use nanoipc::{GuardedSocket, NanoSocket, generic_client, fast_client}; pub use ipc::IpcSocket; pub use ipc::binary::serialize; } @@ -134,11 +134,11 @@ pub fn sync hypervisor.start(); hypervisor.wait_for_startup(); - let sync_client = init_client::>( + let sync_client = generic_client::>( &service_urls::with_base(&hypervisor.io_path, service_urls::SYNC)).unwrap(); - let notify_client = init_client::>( + let notify_client = generic_client::>( &service_urls::with_base(&hypervisor.io_path, service_urls::SYNC_NOTIFY)).unwrap(); - let manage_client = init_client::>( + let manage_client = generic_client::>( &service_urls::with_base(&hypervisor.io_path, service_urls::NETWORK_MANAGER)).unwrap(); *hypervisor_ref = Some(hypervisor); diff --git a/parity/run.rs b/parity/run.rs index 720e6f1bf..cefd8bb21 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -257,8 +257,9 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> { sync: sync_provider.clone(), net: manage_network.clone(), accounts: account_provider.clone(), + shutdown: Default::default(), }); - service.register_io_handler(io_handler).expect("Error registering IO handler"); + service.register_io_handler(io_handler.clone()).expect("Error registering IO handler"); // the watcher must be kept alive. let _watcher = match cmd.no_periodic_snapshot { @@ -289,6 +290,11 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> { // Handle exit wait_for_exit(panic_handler, http_server, ipc_server, dapps_server, signer_server); + // to make sure timer does not spawn requests while shutdown is in progress + io_handler.shutdown.store(true, ::std::sync::atomic::Ordering::SeqCst); + // just Arc is dropping here, to allow other reference release in its default time + drop(io_handler); + // hypervisor should be shutdown first while everything still works and can be // terminated gracefully drop(hypervisor);