Merge pull request #2046 from ethcore/ipc-tweaks-3

IPC tweaks
This commit is contained in:
Robert Habermeier 2016-09-09 17:31:59 +02:00 committed by GitHub
commit 53b22da1c1
12 changed files with 68 additions and 25 deletions

1
Cargo.lock generated
View File

@ -390,6 +390,7 @@ name = "ethcore-ipc-nano"
version = "1.4.0" version = "1.4.0"
dependencies = [ dependencies = [
"ethcore-ipc 1.4.0", "ethcore-ipc 1.4.0",
"lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"nanomsg 0.5.1 (git+https://github.com/ethcore/nanomsg.rs.git)", "nanomsg 0.5.1 (git+https://github.com/ethcore/nanomsg.rs.git)",
] ]

View File

@ -460,7 +460,7 @@ mod client_tests {
crossbeam::scope(move |scope| { crossbeam::scope(move |scope| {
let stop = Arc::new(AtomicBool::new(false)); let stop = Arc::new(AtomicBool::new(false));
run_worker(scope, stop.clone(), url); run_worker(scope, stop.clone(), url);
let client = nanoipc::init_client::<DatabaseClient<_>>(url).unwrap(); let client = nanoipc::generic_client::<DatabaseClient<_>>(url).unwrap();
client.open_default(path.as_str().to_owned()).unwrap(); client.open_default(path.as_str().to_owned()).unwrap();
client.put("xxx".as_bytes(), "1".as_bytes()).unwrap(); client.put("xxx".as_bytes(), "1".as_bytes()).unwrap();
client.close().unwrap(); client.close().unwrap();
@ -477,7 +477,7 @@ mod client_tests {
crossbeam::scope(move |scope| { crossbeam::scope(move |scope| {
let stop = Arc::new(AtomicBool::new(false)); let stop = Arc::new(AtomicBool::new(false));
run_worker(scope, stop.clone(), url); run_worker(scope, stop.clone(), url);
let client = nanoipc::init_client::<DatabaseClient<_>>(url).unwrap(); let client = nanoipc::generic_client::<DatabaseClient<_>>(url).unwrap();
client.open_default(path.as_str().to_owned()).unwrap(); client.open_default(path.as_str().to_owned()).unwrap();
client.put("xxx".as_bytes(), "1".as_bytes()).unwrap(); client.put("xxx".as_bytes(), "1".as_bytes()).unwrap();
@ -498,7 +498,7 @@ mod client_tests {
crossbeam::scope(move |scope| { crossbeam::scope(move |scope| {
let stop = Arc::new(AtomicBool::new(false)); let stop = Arc::new(AtomicBool::new(false));
run_worker(scope, stop.clone(), url); run_worker(scope, stop.clone(), url);
let client = nanoipc::init_client::<DatabaseClient<_>>(url).unwrap(); let client = nanoipc::generic_client::<DatabaseClient<_>>(url).unwrap();
client.open_default(path.as_str().to_owned()).unwrap(); client.open_default(path.as_str().to_owned()).unwrap();
assert!(client.get("xxx".as_bytes()).unwrap().is_none()); assert!(client.get("xxx".as_bytes()).unwrap().is_none());
@ -516,7 +516,7 @@ mod client_tests {
crossbeam::scope(move |scope| { crossbeam::scope(move |scope| {
let stop = Arc::new(AtomicBool::new(false)); let stop = Arc::new(AtomicBool::new(false));
run_worker(scope, stop.clone(), url); run_worker(scope, stop.clone(), url);
let client = nanoipc::init_client::<DatabaseClient<_>>(url).unwrap(); let client = nanoipc::generic_client::<DatabaseClient<_>>(url).unwrap();
client.open_default(path.as_str().to_owned()).unwrap(); client.open_default(path.as_str().to_owned()).unwrap();
let transaction = DBTransaction::new(); let transaction = DBTransaction::new();
@ -541,7 +541,7 @@ mod client_tests {
let stop = StopGuard::new(); let stop = StopGuard::new();
run_worker(&scope, stop.share(), url); run_worker(&scope, stop.share(), url);
let client = nanoipc::init_client::<DatabaseClient<_>>(url).unwrap(); let client = nanoipc::generic_client::<DatabaseClient<_>>(url).unwrap();
client.open_default(path.as_str().to_owned()).unwrap(); client.open_default(path.as_str().to_owned()).unwrap();
let mut batch = Vec::new(); let mut batch = Vec::new();

View File

@ -66,13 +66,13 @@ pub fn extras_service_url(db_path: &str) -> Result<String, ::std::io::Error> {
pub fn blocks_client(db_path: &str) -> Result<DatabaseConnection, ServiceError> { pub fn blocks_client(db_path: &str) -> Result<DatabaseConnection, ServiceError> {
let url = try!(blocks_service_url(db_path)); let url = try!(blocks_service_url(db_path));
let client = try!(nanoipc::init_client::<DatabaseClient<_>>(&url)); let client = try!(nanoipc::generic_client::<DatabaseClient<_>>(&url));
Ok(client) Ok(client)
} }
pub fn extras_client(db_path: &str) -> Result<DatabaseConnection, ServiceError> { pub fn extras_client(db_path: &str) -> Result<DatabaseConnection, ServiceError> {
let url = try!(extras_service_url(db_path)); let url = try!(extras_service_url(db_path));
let client = try!(nanoipc::init_client::<DatabaseClient<_>>(&url)); let client = try!(nanoipc::generic_client::<DatabaseClient<_>>(&url));
Ok(client) Ok(client)
} }

View File

@ -56,7 +56,7 @@ fn can_handshake() {
let stop_guard = StopGuard::new(); let stop_guard = StopGuard::new();
let socket_path = "ipc:///tmp/parity-client-rpc-10.ipc"; let socket_path = "ipc:///tmp/parity-client-rpc-10.ipc";
run_test_worker(scope, stop_guard.share(), socket_path); run_test_worker(scope, stop_guard.share(), socket_path);
let remote_client = nanoipc::init_client::<RemoteClient<_>>(socket_path).unwrap(); let remote_client = nanoipc::generic_client::<RemoteClient<_>>(socket_path).unwrap();
assert!(remote_client.handshake().is_ok()); assert!(remote_client.handshake().is_ok());
}) })
@ -68,7 +68,7 @@ fn can_query_block() {
let stop_guard = StopGuard::new(); let stop_guard = StopGuard::new();
let socket_path = "ipc:///tmp/parity-client-rpc-20.ipc"; let socket_path = "ipc:///tmp/parity-client-rpc-20.ipc";
run_test_worker(scope, stop_guard.share(), socket_path); run_test_worker(scope, stop_guard.share(), socket_path);
let remote_client = nanoipc::init_client::<RemoteClient<_>>(socket_path).unwrap(); let remote_client = nanoipc::generic_client::<RemoteClient<_>>(socket_path).unwrap();
let non_existant_block = remote_client.block_header(BlockID::Number(999)); let non_existant_block = remote_client.block_header(BlockID::Number(999));

View File

@ -240,7 +240,7 @@ mod tests {
::std::thread::spawn(move || { ::std::thread::spawn(move || {
while !hypervisor_ready.load(Ordering::Relaxed) { } while !hypervisor_ready.load(Ordering::Relaxed) { }
let client = nanoipc::init_client::<HypervisorServiceClient<_>>(url).unwrap(); let client = nanoipc::fast_client::<HypervisorServiceClient<_>>(url).unwrap();
client.handshake().unwrap(); client.handshake().unwrap();
client.module_ready(test_module_id); client.module_ready(test_module_id);
}); });

View File

@ -110,7 +110,7 @@ impl HypervisorService {
let modules = self.modules.read().unwrap(); let modules = self.modules.read().unwrap();
modules.get(&module_id).map(|module| { modules.get(&module_id).map(|module| {
trace!(target: "hypervisor", "Sending shutdown to {}({})", module_id, &module.control_url); trace!(target: "hypervisor", "Sending shutdown to {}({})", module_id, &module.control_url);
let client = nanoipc::init_client::<ControlServiceClient<_>>(&module.control_url).unwrap(); let client = nanoipc::fast_client::<ControlServiceClient<_>>(&module.control_url).unwrap();
client.shutdown(); client.shutdown();
trace!(target: "hypervisor", "Sent shutdown to {}", module_id); trace!(target: "hypervisor", "Sent shutdown to {}", module_id);
}); });

View File

@ -10,4 +10,4 @@ license = "GPL-3.0"
ethcore-ipc = { path = "../rpc" } ethcore-ipc = { path = "../rpc" }
nanomsg = { git = "https://github.com/ethcore/nanomsg.rs.git" } nanomsg = { git = "https://github.com/ethcore/nanomsg.rs.git" }
log = "0.3" log = "0.3"
lazy_static = "0.2"

View File

@ -19,6 +19,7 @@
extern crate ethcore_ipc as ipc; extern crate ethcore_ipc as ipc;
extern crate nanomsg; extern crate nanomsg;
#[macro_use] extern crate log; #[macro_use] extern crate log;
#[macro_use] extern crate lazy_static;
pub use ipc::{WithSocket, IpcInterface, IpcConfig}; pub use ipc::{WithSocket, IpcInterface, IpcConfig};
pub use nanomsg::Socket as NanoSocket; pub use nanomsg::Socket as NanoSocket;
@ -28,7 +29,8 @@ use nanomsg::{Socket, Protocol, Error, Endpoint, PollRequest, PollFd, PollInOut}
use std::ops::Deref; use std::ops::Deref;
const POLL_TIMEOUT: isize = 200; const POLL_TIMEOUT: isize = 200;
const CLIENT_CONNECTION_TIMEOUT: isize = 120000; const DEFAULT_CONNECTION_TIMEOUT: isize = 30000;
const DEBUG_CONNECTION_TIMEOUT: isize = 5000;
/// Generic worker to handle service (binded) sockets /// Generic worker to handle service (binded) sockets
pub struct Worker<S: ?Sized> where S: IpcInterface { pub struct Worker<S: ?Sized> where S: IpcInterface {
@ -68,7 +70,7 @@ pub fn init_duplex_client<S>(socket_addr: &str) -> Result<GuardedSocket<S>, Sock
SocketError::DuplexLink SocketError::DuplexLink
})); }));
socket.set_receive_timeout(CLIENT_CONNECTION_TIMEOUT).unwrap(); socket.set_receive_timeout(DEFAULT_CONNECTION_TIMEOUT).unwrap();
let endpoint = try!(socket.connect(socket_addr).map_err(|e| { let endpoint = try!(socket.connect(socket_addr).map_err(|e| {
warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", socket_addr, e); warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", socket_addr, e);
@ -84,26 +86,58 @@ pub fn init_duplex_client<S>(socket_addr: &str) -> Result<GuardedSocket<S>, Sock
/// Spawns client <`S`> over specified address /// Spawns client <`S`> over specified address
/// creates socket and connects endpoint to it /// creates socket and connects endpoint to it
/// for request-reply connections to the service /// for request-reply connections to the service
pub fn init_client<S>(socket_addr: &str) -> Result<GuardedSocket<S>, SocketError> where S: WithSocket<Socket> { pub fn client<S>(socket_addr: &str, receive_timeout: Option<isize>) -> Result<GuardedSocket<S>, SocketError> where S: WithSocket<Socket> {
let mut socket = try!(Socket::new(Protocol::Req).map_err(|e| { let mut socket = try!(Socket::new(Protocol::Req).map_err(|e| {
warn!(target: "ipc", "Failed to create ipc socket: {:?}", e); warn!(target: "ipc", "Failed to create ipc socket: {:?}", e);
SocketError::RequestLink SocketError::RequestLink
})); }));
socket.set_receive_timeout(CLIENT_CONNECTION_TIMEOUT).unwrap(); if let Some(timeout) = receive_timeout {
socket.set_receive_timeout(timeout).unwrap();
}
let endpoint = try!(socket.connect(socket_addr).map_err(|e| { let endpoint = try!(socket.connect(socket_addr).map_err(|e| {
warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", socket_addr, e); warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", socket_addr, e);
SocketError::RequestLink SocketError::RequestLink
})); }));
trace!(target: "ipc", "Created cleint for {}", socket_addr); trace!(target: "ipc", "Created client for {}", socket_addr);
Ok(GuardedSocket { Ok(GuardedSocket {
client: Arc::new(S::init(socket)), client: Arc::new(S::init(socket)),
_endpoint: endpoint, _endpoint: endpoint,
}) })
} }
lazy_static! {
/// Set PARITY_IPC_DEBUG=1 for fail-fast connectivity problems diagnostic
pub static ref DEBUG_FLAG: bool = {
use std::env;
if let Ok(debug) = env::var("PARITY_IPC_DEBUG") {
debug == "1" || debug.to_uppercase() == "TRUE"
}
else { false }
};
}
/// Client with no default timeout on operations
pub fn generic_client<S>(socket_addr: &str) -> Result<GuardedSocket<S>, SocketError> where S: WithSocket<Socket> {
if *DEBUG_FLAG {
client(socket_addr, Some(DEBUG_CONNECTION_TIMEOUT))
} else {
client(socket_addr, None)
}
}
/// Client over interface that is supposed to give quick almost non-blocking responses
pub fn fast_client<S>(socket_addr: &str) -> Result<GuardedSocket<S>, SocketError> where S: WithSocket<Socket> {
if *DEBUG_FLAG {
client(socket_addr, Some(DEBUG_CONNECTION_TIMEOUT))
} else {
client(socket_addr, Some(DEFAULT_CONNECTION_TIMEOUT))
}
}
/// Error occurred while establising socket or endpoint /// Error occurred while establising socket or endpoint
#[derive(Debug)] #[derive(Debug)]
pub enum SocketError { pub enum SocketError {

View File

@ -63,7 +63,7 @@ pub fn payload<B: ipc::BinaryConvertable>() -> Result<B, BootError> {
} }
pub fn register(hv_url: &str, control_url: &str, module_id: IpcModuleId) -> GuardedSocket<HypervisorServiceClient<NanoSocket>>{ pub fn register(hv_url: &str, control_url: &str, module_id: IpcModuleId) -> GuardedSocket<HypervisorServiceClient<NanoSocket>>{
let hypervisor_client = nanoipc::init_client::<HypervisorServiceClient<_>>(hv_url).unwrap(); let hypervisor_client = nanoipc::fast_client::<HypervisorServiceClient<_>>(hv_url).unwrap();
hypervisor_client.handshake().unwrap(); hypervisor_client.handshake().unwrap();
hypervisor_client.module_ready(module_id, control_url.to_owned()); hypervisor_client.module_ready(module_id, control_url.to_owned());
@ -73,7 +73,7 @@ pub fn register(hv_url: &str, control_url: &str, module_id: IpcModuleId) -> Guar
pub fn dependency<C: WithSocket<NanoSocket>>(url: &str) pub fn dependency<C: WithSocket<NanoSocket>>(url: &str)
-> Result<GuardedSocket<C>, BootError> -> Result<GuardedSocket<C>, BootError>
{ {
nanoipc::init_client::<C>(url).map_err(|socket_err| BootError::DependencyConnect(socket_err)) nanoipc::generic_client::<C>(url).map_err(|socket_err| BootError::DependencyConnect(socket_err))
} }
pub fn main_thread() -> Arc<AtomicBool> { pub fn main_thread() -> Arc<AtomicBool> {

View File

@ -15,6 +15,7 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use ethcore::client::Client; use ethcore::client::Client;
use ethcore::service::ClientIoMessage; use ethcore::service::ClientIoMessage;
use ethsync::{SyncProvider, ManageNetwork}; use ethsync::{SyncProvider, ManageNetwork};
@ -31,6 +32,7 @@ pub struct ClientIoHandler {
pub net: Arc<ManageNetwork>, pub net: Arc<ManageNetwork>,
pub accounts: Arc<AccountProvider>, pub accounts: Arc<AccountProvider>,
pub info: Arc<Informant>, pub info: Arc<Informant>,
pub shutdown: Arc<AtomicBool>
} }
impl IoHandler<ClientIoMessage> for ClientIoHandler { impl IoHandler<ClientIoMessage> for ClientIoHandler {
@ -39,7 +41,7 @@ impl IoHandler<ClientIoMessage> for ClientIoHandler {
} }
fn timeout(&self, _io: &IoContext<ClientIoMessage>, timer: TimerToken) { fn timeout(&self, _io: &IoContext<ClientIoMessage>, timer: TimerToken) {
if let INFO_TIMER = timer { if timer == INFO_TIMER && !self.shutdown.load(Ordering::SeqCst) {
self.info.tick(); self.info.tick();
} }
} }

View File

@ -71,7 +71,7 @@ mod ipc_deps {
pub use ethsync::{SyncClient, NetworkManagerClient, ServiceConfiguration}; pub use ethsync::{SyncClient, NetworkManagerClient, ServiceConfiguration};
pub use ethcore::client::ChainNotifyClient; pub use ethcore::client::ChainNotifyClient;
pub use hypervisor::{SYNC_MODULE_ID, BootArgs, HYPERVISOR_IPC_URL}; pub use hypervisor::{SYNC_MODULE_ID, BootArgs, HYPERVISOR_IPC_URL};
pub use nanoipc::{GuardedSocket, NanoSocket, init_client}; pub use nanoipc::{GuardedSocket, NanoSocket, generic_client, fast_client};
pub use ipc::IpcSocket; pub use ipc::IpcSocket;
pub use ipc::binary::serialize; pub use ipc::binary::serialize;
} }
@ -134,11 +134,11 @@ pub fn sync
hypervisor.start(); hypervisor.start();
hypervisor.wait_for_startup(); hypervisor.wait_for_startup();
let sync_client = init_client::<SyncClient<_>>( let sync_client = generic_client::<SyncClient<_>>(
&service_urls::with_base(&hypervisor.io_path, service_urls::SYNC)).unwrap(); &service_urls::with_base(&hypervisor.io_path, service_urls::SYNC)).unwrap();
let notify_client = init_client::<ChainNotifyClient<_>>( let notify_client = generic_client::<ChainNotifyClient<_>>(
&service_urls::with_base(&hypervisor.io_path, service_urls::SYNC_NOTIFY)).unwrap(); &service_urls::with_base(&hypervisor.io_path, service_urls::SYNC_NOTIFY)).unwrap();
let manage_client = init_client::<NetworkManagerClient<_>>( let manage_client = generic_client::<NetworkManagerClient<_>>(
&service_urls::with_base(&hypervisor.io_path, service_urls::NETWORK_MANAGER)).unwrap(); &service_urls::with_base(&hypervisor.io_path, service_urls::NETWORK_MANAGER)).unwrap();
*hypervisor_ref = Some(hypervisor); *hypervisor_ref = Some(hypervisor);

View File

@ -257,8 +257,9 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> {
sync: sync_provider.clone(), sync: sync_provider.clone(),
net: manage_network.clone(), net: manage_network.clone(),
accounts: account_provider.clone(), accounts: account_provider.clone(),
shutdown: Default::default(),
}); });
service.register_io_handler(io_handler).expect("Error registering IO handler"); service.register_io_handler(io_handler.clone()).expect("Error registering IO handler");
// the watcher must be kept alive. // the watcher must be kept alive.
let _watcher = match cmd.no_periodic_snapshot { let _watcher = match cmd.no_periodic_snapshot {
@ -289,6 +290,11 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> {
// Handle exit // Handle exit
wait_for_exit(panic_handler, http_server, ipc_server, dapps_server, signer_server); wait_for_exit(panic_handler, http_server, ipc_server, dapps_server, signer_server);
// to make sure timer does not spawn requests while shutdown is in progress
io_handler.shutdown.store(true, ::std::sync::atomic::Ordering::SeqCst);
// just Arc is dropping here, to allow other reference release in its default time
drop(io_handler);
// hypervisor should be shutdown first while everything still works and can be // hypervisor should be shutdown first while everything still works and can be
// terminated gracefully // terminated gracefully
drop(hypervisor); drop(hypervisor);