further tweaking of ipc entities lifecycle

This commit is contained in:
Nikolay Volf 2016-09-03 11:31:29 +03:00
parent 9a5668f802
commit 2fc70902e7
12 changed files with 68 additions and 25 deletions

1
Cargo.lock generated
View File

@ -381,6 +381,7 @@ name = "ethcore-ipc-nano"
version = "1.4.0"
dependencies = [
"ethcore-ipc 1.4.0",
"lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"nanomsg 0.5.1 (git+https://github.com/ethcore/nanomsg.rs.git)",
]

View File

@ -460,7 +460,7 @@ mod client_tests {
crossbeam::scope(move |scope| {
let stop = Arc::new(AtomicBool::new(false));
run_worker(scope, stop.clone(), url);
let client = nanoipc::init_client::<DatabaseClient<_>>(url).unwrap();
let client = nanoipc::generic_client::<DatabaseClient<_>>(url).unwrap();
client.open_default(path.as_str().to_owned()).unwrap();
client.put("xxx".as_bytes(), "1".as_bytes()).unwrap();
client.close().unwrap();
@ -477,7 +477,7 @@ mod client_tests {
crossbeam::scope(move |scope| {
let stop = Arc::new(AtomicBool::new(false));
run_worker(scope, stop.clone(), url);
let client = nanoipc::init_client::<DatabaseClient<_>>(url).unwrap();
let client = nanoipc::generic_client::<DatabaseClient<_>>(url).unwrap();
client.open_default(path.as_str().to_owned()).unwrap();
client.put("xxx".as_bytes(), "1".as_bytes()).unwrap();
@ -498,7 +498,7 @@ mod client_tests {
crossbeam::scope(move |scope| {
let stop = Arc::new(AtomicBool::new(false));
run_worker(scope, stop.clone(), url);
let client = nanoipc::init_client::<DatabaseClient<_>>(url).unwrap();
let client = nanoipc::generic_client::<DatabaseClient<_>>(url).unwrap();
client.open_default(path.as_str().to_owned()).unwrap();
assert!(client.get("xxx".as_bytes()).unwrap().is_none());
@ -516,7 +516,7 @@ mod client_tests {
crossbeam::scope(move |scope| {
let stop = Arc::new(AtomicBool::new(false));
run_worker(scope, stop.clone(), url);
let client = nanoipc::init_client::<DatabaseClient<_>>(url).unwrap();
let client = nanoipc::generic_client::<DatabaseClient<_>>(url).unwrap();
client.open_default(path.as_str().to_owned()).unwrap();
let transaction = DBTransaction::new();
@ -541,7 +541,7 @@ mod client_tests {
let stop = StopGuard::new();
run_worker(&scope, stop.share(), url);
let client = nanoipc::init_client::<DatabaseClient<_>>(url).unwrap();
let client = nanoipc::generic_client::<DatabaseClient<_>>(url).unwrap();
client.open_default(path.as_str().to_owned()).unwrap();
let mut batch = Vec::new();

View File

@ -66,13 +66,13 @@ pub fn extras_service_url(db_path: &str) -> Result<String, ::std::io::Error> {
pub fn blocks_client(db_path: &str) -> Result<DatabaseConnection, ServiceError> {
let url = try!(blocks_service_url(db_path));
let client = try!(nanoipc::init_client::<DatabaseClient<_>>(&url));
let client = try!(nanoipc::generic_client::<DatabaseClient<_>>(&url));
Ok(client)
}
pub fn extras_client(db_path: &str) -> Result<DatabaseConnection, ServiceError> {
let url = try!(extras_service_url(db_path));
let client = try!(nanoipc::init_client::<DatabaseClient<_>>(&url));
let client = try!(nanoipc::generic_client::<DatabaseClient<_>>(&url));
Ok(client)
}

View File

@ -51,7 +51,7 @@ fn can_handshake() {
let stop_guard = StopGuard::new();
let socket_path = "ipc:///tmp/parity-client-rpc-10.ipc";
run_test_worker(scope, stop_guard.share(), socket_path);
let remote_client = nanoipc::init_client::<RemoteClient<_>>(socket_path).unwrap();
let remote_client = nanoipc::generic_client::<RemoteClient<_>>(socket_path).unwrap();
assert!(remote_client.handshake().is_ok());
})
@ -63,7 +63,7 @@ fn can_query_block() {
let stop_guard = StopGuard::new();
let socket_path = "ipc:///tmp/parity-client-rpc-20.ipc";
run_test_worker(scope, stop_guard.share(), socket_path);
let remote_client = nanoipc::init_client::<RemoteClient<_>>(socket_path).unwrap();
let remote_client = nanoipc::generic_client::<RemoteClient<_>>(socket_path).unwrap();
let non_existant_block = remote_client.block_header(BlockID::Number(999));

View File

@ -240,7 +240,7 @@ mod tests {
::std::thread::spawn(move || {
while !hypervisor_ready.load(Ordering::Relaxed) { }
let client = nanoipc::init_client::<HypervisorServiceClient<_>>(url).unwrap();
let client = nanoipc::fast_client::<HypervisorServiceClient<_>>(url).unwrap();
client.handshake().unwrap();
client.module_ready(test_module_id);
});

View File

@ -110,7 +110,7 @@ impl HypervisorService {
let modules = self.modules.read().unwrap();
modules.get(&module_id).map(|module| {
trace!(target: "hypervisor", "Sending shutdown to {}({})", module_id, &module.control_url);
let client = nanoipc::init_client::<ControlServiceClient<_>>(&module.control_url).unwrap();
let client = nanoipc::fast_client::<ControlServiceClient<_>>(&module.control_url).unwrap();
client.shutdown();
trace!(target: "hypervisor", "Sent shutdown to {}", module_id);
});

View File

@ -10,4 +10,4 @@ license = "GPL-3.0"
ethcore-ipc = { path = "../rpc" }
nanomsg = { git = "https://github.com/ethcore/nanomsg.rs.git" }
log = "0.3"
lazy_static = "0.2"

View File

@ -19,6 +19,7 @@
extern crate ethcore_ipc as ipc;
extern crate nanomsg;
#[macro_use] extern crate log;
#[macro_use] extern crate lazy_static;
pub use ipc::{WithSocket, IpcInterface, IpcConfig};
pub use nanomsg::Socket as NanoSocket;
@ -28,7 +29,8 @@ use nanomsg::{Socket, Protocol, Error, Endpoint, PollRequest, PollFd, PollInOut}
use std::ops::Deref;
const POLL_TIMEOUT: isize = 200;
const CLIENT_CONNECTION_TIMEOUT: isize = 120000;
const DEFAULT_CONNECTION_TIMEOUT: isize = 30000;
const DEBUG_CONNECTION_TIMEOUT: isize = 5000;
/// Generic worker to handle service (binded) sockets
pub struct Worker<S: ?Sized> where S: IpcInterface {
@ -68,7 +70,7 @@ pub fn init_duplex_client<S>(socket_addr: &str) -> Result<GuardedSocket<S>, Sock
SocketError::DuplexLink
}));
socket.set_receive_timeout(CLIENT_CONNECTION_TIMEOUT).unwrap();
socket.set_receive_timeout(DEFAULT_CONNECTION_TIMEOUT).unwrap();
let endpoint = try!(socket.connect(socket_addr).map_err(|e| {
warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", socket_addr, e);
@ -84,26 +86,58 @@ pub fn init_duplex_client<S>(socket_addr: &str) -> Result<GuardedSocket<S>, Sock
/// Spawns client <`S`> over specified address
/// creates socket and connects endpoint to it
/// for request-reply connections to the service
pub fn init_client<S>(socket_addr: &str) -> Result<GuardedSocket<S>, SocketError> where S: WithSocket<Socket> {
pub fn client<S>(socket_addr: &str, receive_timeout: Option<isize>) -> Result<GuardedSocket<S>, SocketError> where S: WithSocket<Socket> {
let mut socket = try!(Socket::new(Protocol::Req).map_err(|e| {
warn!(target: "ipc", "Failed to create ipc socket: {:?}", e);
SocketError::RequestLink
}));
socket.set_receive_timeout(CLIENT_CONNECTION_TIMEOUT).unwrap();
if let Some(timeout) = receive_timeout {
socket.set_receive_timeout(timeout).unwrap();
}
let endpoint = try!(socket.connect(socket_addr).map_err(|e| {
warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", socket_addr, e);
SocketError::RequestLink
}));
trace!(target: "ipc", "Created cleint for {}", socket_addr);
trace!(target: "ipc", "Created client for {}", socket_addr);
Ok(GuardedSocket {
client: Arc::new(S::init(socket)),
_endpoint: endpoint,
})
}
lazy_static! {
/// Set PARITY_IPC_DEBUG=1 for fail-fast connectivity problems diagnostic
pub static ref DEBUG_FLAG: bool = {
use std::env;
if let Ok(debug) = env::var("PARITY_IPC_DEBUG") {
debug == "1" || debug.to_uppercase() == "TRUE"
}
else { false }
};
}
/// Client with no default timeout on operations
pub fn generic_client<S>(socket_addr: &str) -> Result<GuardedSocket<S>, SocketError> where S: WithSocket<Socket> {
if *DEBUG_FLAG {
client(socket_addr, Some(DEBUG_CONNECTION_TIMEOUT))
} else {
client(socket_addr, None)
}
}
/// Client over interface that is supposed to give quick almost non-blocking responses
pub fn fast_client<S>(socket_addr: &str) -> Result<GuardedSocket<S>, SocketError> where S: WithSocket<Socket> {
if *DEBUG_FLAG {
client(socket_addr, Some(DEBUG_CONNECTION_TIMEOUT))
} else {
client(socket_addr, Some(DEFAULT_CONNECTION_TIMEOUT))
}
}
/// Error occurred while establising socket or endpoint
#[derive(Debug)]
pub enum SocketError {

View File

@ -63,7 +63,7 @@ pub fn payload<B: ipc::BinaryConvertable>() -> Result<B, BootError> {
}
pub fn register(hv_url: &str, control_url: &str, module_id: IpcModuleId) -> GuardedSocket<HypervisorServiceClient<NanoSocket>>{
let hypervisor_client = nanoipc::init_client::<HypervisorServiceClient<_>>(hv_url).unwrap();
let hypervisor_client = nanoipc::fast_client::<HypervisorServiceClient<_>>(hv_url).unwrap();
hypervisor_client.handshake().unwrap();
hypervisor_client.module_ready(module_id, control_url.to_owned());
@ -73,7 +73,7 @@ pub fn register(hv_url: &str, control_url: &str, module_id: IpcModuleId) -> Guar
pub fn dependency<C: WithSocket<NanoSocket>>(url: &str)
-> Result<GuardedSocket<C>, BootError>
{
nanoipc::init_client::<C>(url).map_err(|socket_err| BootError::DependencyConnect(socket_err))
nanoipc::generic_client::<C>(url).map_err(|socket_err| BootError::DependencyConnect(socket_err))
}
pub fn main_thread() -> Arc<AtomicBool> {

View File

@ -15,6 +15,7 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use ethcore::client::Client;
use ethcore::service::ClientIoMessage;
use ethsync::{SyncProvider, ManageNetwork};
@ -31,6 +32,7 @@ pub struct ClientIoHandler {
pub net: Arc<ManageNetwork>,
pub accounts: Arc<AccountProvider>,
pub info: Arc<Informant>,
pub shutdown: Arc<AtomicBool>
}
impl IoHandler<ClientIoMessage> for ClientIoHandler {
@ -39,7 +41,7 @@ impl IoHandler<ClientIoMessage> for ClientIoHandler {
}
fn timeout(&self, _io: &IoContext<ClientIoMessage>, timer: TimerToken) {
if let INFO_TIMER = timer {
if timer == INFO_TIMER && !self.shutdown.load(Ordering::SeqCst) {
self.info.tick();
}
}

View File

@ -68,7 +68,7 @@ mod ipc_deps {
pub use ethsync::{SyncClient, NetworkManagerClient, ServiceConfiguration};
pub use ethcore::client::ChainNotifyClient;
pub use hypervisor::{SYNC_MODULE_ID, BootArgs, HYPERVISOR_IPC_URL};
pub use nanoipc::{GuardedSocket, NanoSocket, init_client};
pub use nanoipc::{GuardedSocket, NanoSocket, generic_client, fast_client};
pub use ipc::IpcSocket;
pub use ipc::binary::serialize;
}
@ -130,11 +130,11 @@ pub fn sync
hypervisor.start();
hypervisor.wait_for_startup();
let sync_client = init_client::<SyncClient<_>>(
let sync_client = generic_client::<SyncClient<_>>(
&service_urls::with_base(&hypervisor.io_path, service_urls::SYNC)).unwrap();
let notify_client = init_client::<ChainNotifyClient<_>>(
let notify_client = generic_client::<ChainNotifyClient<_>>(
&service_urls::with_base(&hypervisor.io_path, service_urls::SYNC_NOTIFY)).unwrap();
let manage_client = init_client::<NetworkManagerClient<_>>(
let manage_client = generic_client::<NetworkManagerClient<_>>(
&service_urls::with_base(&hypervisor.io_path, service_urls::NETWORK_MANAGER)).unwrap();
*hypervisor_ref = Some(hypervisor);

View File

@ -246,8 +246,9 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> {
sync: sync_provider.clone(),
net: manage_network.clone(),
accounts: account_provider.clone(),
shutdown: Default::default(),
});
service.register_io_handler(io_handler).expect("Error registering IO handler");
service.register_io_handler(io_handler.clone()).expect("Error registering IO handler");
// start ui
if cmd.ui {
@ -260,6 +261,11 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> {
// Handle exit
wait_for_exit(panic_handler, http_server, ipc_server, dapps_server, signer_server);
// to make sure timer does not spawn requests while shutdown is in progress
io_handler.shutdown.store(true, ::std::sync::atomic::Ordering::SeqCst);
// just Arc is dropping here, to allow other reference release in its default time
drop(io_handler);
// hypervisor should be shutdown first while everything still works and can be
// terminated gracefully
drop(hypervisor);