From 8ab56ea3d1e0043a728c868534da21b3c17f7fea Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Wed, 20 Jul 2016 18:13:56 +0200 Subject: [PATCH] IPC (feature-gated) (#1654) * moving ipc deriving to trait * refactoring of the client * all compiled * proved all working * warnings purged * allow hypervisor to specify initialization payload in two ways * using binary initialisation payload for sync * some docs * logger to separate crate * log settings for sync bin * forwarding logging arguments to the sync --- Cargo.lock | 14 +++ Cargo.toml | 2 + ethcore/Cargo.toml | 1 + ethcore/src/client/client.rs | 4 +- ethcore/src/client/mod.rs | 2 +- ethcore/src/service.rs | 31 +++++- ipc/codegen/src/codegen.rs | 12 ++- ipc/codegen/src/serialization.rs | 12 ++- ipc/hypervisor/src/lib.rs | 60 ++++++++++-- ipc/hypervisor/src/service.rs.in | 1 + ipc/nano/src/lib.rs | 5 +- ipc/rpc/src/binary.rs | 37 ++++++++ logger/Cargo.toml | 19 ++++ parity/setup_log.rs => logger/src/lib.rs | 51 +++++++++- parity/configuration.rs | 15 +++ parity/main.rs | 20 ++-- parity/modules.rs | 116 ++++++++++++++++++++--- parity/sync/main.rs | 83 ++++++++-------- sync/src/api.rs | 7 ++ sync/src/lib.rs | 3 +- sync/src/tests/mod.rs | 3 +- sync/src/tests/rpc.rs | 29 ++++++ 22 files changed, 439 insertions(+), 88 deletions(-) create mode 100644 logger/Cargo.toml rename parity/setup_log.rs => logger/src/lib.rs (73%) create mode 100644 sync/src/tests/rpc.rs diff --git a/Cargo.lock b/Cargo.lock index c60615688..974a50f96 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15,6 +15,7 @@ dependencies = [ "ethcore-ipc-codegen 1.3.0", "ethcore-ipc-hypervisor 1.2.0", "ethcore-ipc-nano 1.3.0", + "ethcore-logger 1.3.0", "ethcore-rpc 1.3.0", "ethcore-signer 1.3.0", "ethcore-util 1.3.0", @@ -343,6 +344,19 @@ dependencies = [ "nanomsg 0.5.1 (git+https://github.com/ethcore/nanomsg.rs.git)", ] +[[package]] +name = "ethcore-logger" +version = "1.3.0" +dependencies = [ + "env_logger 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", + "ethcore-util 1.3.0", + "isatty 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", + "regex 0.1.68 (registry+https://github.com/rust-lang/crates.io-index)", + "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "ethcore-rpc" version = "1.3.0" diff --git a/Cargo.toml b/Cargo.toml index ee60a704f..6c71a1f6d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ ethcore-signer = { path = "signer" } ethcore-ipc-nano = { path = "ipc/nano" } ethcore-ipc = { path = "ipc/rpc" } ethcore-ipc-hypervisor = { path = "ipc/hypervisor" } +ethcore-logger = { path = "logger" } json-ipc-server = { git = "https://github.com/ethcore/json-ipc-server.git" } ethcore-dapps = { path = "dapps", optional = true } clippy = { version = "0.0.79", optional = true} @@ -58,6 +59,7 @@ dapps = ["ethcore-dapps"] dev = ["clippy", "ethcore/dev", "ethcore-util/dev", "ethsync/dev", "ethcore-rpc/dev", "ethcore-dapps/dev", "ethcore-signer/dev"] travis-beta = ["ethcore/json-tests"] travis-nightly = ["ethcore/json-tests", "dev"] +ipc = ["ethcore/ipc"] [[bin]] path = "parity/main.rs" diff --git a/ethcore/Cargo.toml b/ethcore/Cargo.toml index 8b10fadea..dd2a1f91e 100644 --- a/ethcore/Cargo.toml +++ b/ethcore/Cargo.toml @@ -46,3 +46,4 @@ test-heavy = [] dev = ["clippy"] default = [] benches = [] +ipc = [] diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 7cc909fa7..e804cde57 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -235,8 +235,8 @@ impl Client { } /// Adds an actor to be notified on certain events - pub fn add_notify(&self, target: &Arc) { - self.notify.write().push(Arc::downgrade(target)); + pub fn add_notify(&self, target: Arc) { + self.notify.write().push(Arc::downgrade(&target)); } fn notify(&self, f: F) where F: Fn(&ChainNotify) { diff --git a/ethcore/src/client/mod.rs b/ethcore/src/client/mod.rs index 0d5f2a941..4bcec0169 100644 --- a/ethcore/src/client/mod.rs +++ b/ethcore/src/client/mod.rs @@ -30,7 +30,7 @@ pub use self::test_client::{TestBlockChainClient, EachBlockWith}; pub use types::trace_filter::Filter as TraceFilter; pub use executive::{Executed, Executive, TransactOptions}; pub use env_info::{LastHashes, EnvInfo}; -pub use self::chain_notify::ChainNotify; +pub use self::chain_notify::{ChainNotify, ChainNotifyClient}; pub use types::call_analytics::CallAnalytics; pub use block_import_error::BlockImportError; diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index 3ea1dd114..57b72cb2e 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -22,6 +22,12 @@ use spec::Spec; use error::*; use client::{Client, ClientConfig, ChainNotify}; use miner::Miner; +use std::sync::atomic::AtomicBool; + +#[cfg(feature="ipc")] +use nanoipc; +#[cfg(feature="ipc")] +use client::BlockChainClient; /// Message type for external and internal events #[derive(Clone)] @@ -38,7 +44,8 @@ pub enum ClientIoMessage { pub struct ClientService { io_service: Arc>, client: Arc, - panic_handler: Arc + panic_handler: Arc, + _stop_guard: ::devtools::StopGuard, } impl ClientService { @@ -62,10 +69,14 @@ impl ClientService { }); try!(io_service.register_handler(client_io)); + let stop_guard = ::devtools::StopGuard::new(); + run_ipc(client.clone(), stop_guard.share()); + Ok(ClientService { io_service: Arc::new(io_service), client: client, panic_handler: panic_handler, + _stop_guard: stop_guard, }) } @@ -90,7 +101,7 @@ impl ClientService { } /// Set the actor to be notified on certain chain events - pub fn add_notify(&self, notify: &Arc) { + pub fn add_notify(&self, notify: Arc) { self.client.add_notify(notify); } } @@ -130,6 +141,22 @@ impl IoHandler for ClientIoHandler { } } +#[cfg(feature="ipc")] +fn run_ipc(client: Arc, stop: Arc) { + ::std::thread::spawn(move || { + let mut worker = nanoipc::Worker::new(&(client as Arc)); + worker.add_reqrep("ipc:///tmp/parity-chain.ipc").expect("Ipc expected to initialize with no issues"); + + while !stop.load(::std::sync::atomic::Ordering::Relaxed) { + worker.poll(); + } + }); +} + +#[cfg(not(feature="ipc"))] +fn run_ipc(_client: Arc, _stop: Arc) { +} + #[cfg(test)] mod tests { use super::*; diff --git a/ipc/codegen/src/codegen.rs b/ipc/codegen/src/codegen.rs index 0849acc0a..e9d80c7e9 100644 --- a/ipc/codegen/src/codegen.rs +++ b/ipc/codegen/src/codegen.rs @@ -273,7 +273,12 @@ fn implement_dispatch_arm( { let index_ident = builder.id(format!("{}", index + (RESERVED_MESSAGE_IDS as u32)).as_str()); let invoke_expr = implement_dispatch_arm_invoke(cx, builder, dispatch, buffer); - quote_arm!(cx, $index_ident => { $invoke_expr } ) + let dispatching_trace = "Dispatching: ".to_owned() + &dispatch.function_name; + let dispatching_trace_literal = builder.expr().lit().str::<&str>(&dispatching_trace); + quote_arm!(cx, $index_ident => { + trace!(target: "ipc", $dispatching_trace_literal); + $invoke_expr + }) } fn implement_dispatch_arms( @@ -420,17 +425,22 @@ fn implement_client_method_body( request_serialization_statements }; + let invocation_trace = "Invoking: ".to_owned() + &dispatch.function_name; + let invocation_trace_literal = builder.expr().lit().str::<&str>(&invocation_trace); + if let Some(ref return_ty) = dispatch.return_type_ty { let return_expr = quote_expr!(cx, ::ipc::binary::deserialize_from::<$return_ty, _>(&mut *socket).unwrap() ); quote_expr!(cx, { + trace!(target: "ipc", $invocation_trace_literal); $request $return_expr }) } else { quote_expr!(cx, { + trace!(target: "ipc", $invocation_trace_literal); $request }) } diff --git a/ipc/codegen/src/serialization.rs b/ipc/codegen/src/serialization.rs index 983fabd48..60b54edd4 100644 --- a/ipc/codegen/src/serialization.rs +++ b/ipc/codegen/src/serialization.rs @@ -261,6 +261,10 @@ fn binary_expr_struct( let raw_ident = ::syntax::print::pprust::ty_to_string(&codegen::strip_ptr(&field.ty)); let range_ident = builder.id(format!("r{}", index)); + + let error_message = "Error serializing member: ".to_owned() + &::syntax::print::pprust::expr_to_string(&member_expr); + let error_message_literal = builder.expr().lit().str::<&str>(&error_message); + match raw_ident.as_ref() { "u8" => { write_stmts.push(quote_stmt!(cx, let next_line = offset + 1;).unwrap()); @@ -280,7 +284,13 @@ fn binary_expr_struct( }).unwrap()); write_stmts.push(quote_stmt!(cx, let $range_ident = offset..next_line; ).unwrap()); post_write_stmts.push(quote_stmt!(cx, - if let Err(e) = $member_expr .to_bytes(&mut buffer[$range_ident], length_stack) { return Err(e) };).unwrap()); + if $range_ident.end - $range_ident.start > 0 { + if let Err(e) = $member_expr .to_bytes(&mut buffer[$range_ident], length_stack) { + warn!(target: "ipc", $error_message_literal); + return Err(e) + }; + } + ).unwrap()); } } diff --git a/ipc/hypervisor/src/lib.rs b/ipc/hypervisor/src/lib.rs index 0a4d39390..16b6896bd 100644 --- a/ipc/hypervisor/src/lib.rs +++ b/ipc/hypervisor/src/lib.rs @@ -42,7 +42,35 @@ pub struct Hypervisor { service: Arc, ipc_worker: RwLock>, processes: RwLock>, - modules: HashMap)>, + modules: HashMap, +} + +/// Boot arguments for binary +pub struct BootArgs { + cli: Option>, + stdin: Option>, +} + +impl BootArgs { + /// New empty boot arguments + pub fn new() -> BootArgs { + BootArgs { + cli: None, + stdin: None, + } + } + + /// Set command-line arguments for boot + pub fn cli(mut self, cli: Vec) -> BootArgs { + self.cli = Some(cli); + self + } + + /// Set std-in stream for boot + pub fn stdin(mut self, stdin: Vec) -> BootArgs { + self.stdin = Some(stdin); + self + } } impl Hypervisor { @@ -51,7 +79,7 @@ impl Hypervisor { Hypervisor::with_url(HYPERVISOR_IPC_URL) } - pub fn module(mut self, module_id: IpcModuleId, binary_id: BinaryId, args: Vec) -> Hypervisor { + pub fn module(mut self, module_id: IpcModuleId, binary_id: BinaryId, args: BootArgs) -> Hypervisor { self.modules.insert(module_id, (binary_id, args)); self.service.add_module(module_id); self @@ -78,7 +106,7 @@ impl Hypervisor { /// Since one binary can host multiple modules /// we match binaries - fn match_module(&self, module_id: &IpcModuleId) -> Option<&(BinaryId, Vec)> { + fn match_module(&self, module_id: &IpcModuleId) -> Option<&(BinaryId, BootArgs)> { self.modules.get(module_id) } @@ -96,6 +124,8 @@ impl Hypervisor { /// Does nothing when it is already started on module is inside the /// main binary fn start_module(&self, module_id: IpcModuleId) { + use std::io::Write; + self.match_module(&module_id).map(|&(ref binary_id, ref binary_args)| { let mut processes = self.processes.write().unwrap(); { @@ -109,13 +139,30 @@ impl Hypervisor { executable_path.pop(); executable_path.push(binary_id); - let mut command = Command::new(&executable_path.to_str().unwrap()); - for arg in binary_args { command.arg(arg); } + let executable_path = executable_path.to_str().unwrap(); + let mut command = Command::new(&executable_path); + command.stderr(std::process::Stdio::inherit()); + + if let Some(ref cli_args) = binary_args.cli { + for arg in cli_args { command.arg(arg); } + } + + command.stdin(std::process::Stdio::piped()); trace!(target: "hypervisor", "Spawn executable: {:?}", command); - let child = command.spawn().unwrap_or_else( + let mut child = command.spawn().unwrap_or_else( |e| panic!("Hypervisor cannot start binary ({:?}): {}", executable_path, e)); + + if let Some(ref std_in) = binary_args.stdin { + trace!(target: "hypervisor", "Pushing std-in payload..."); + child.stdin.as_mut() + .expect("std-in should be piped above") + .write(std_in) + .unwrap_or_else(|e| panic!(format!("Error trying to pipe stdin for {}: {:?}", &executable_path, e))); + drop(child.stdin.take()); + } + processes.insert(binary_id, child); }); } @@ -133,6 +180,7 @@ impl Hypervisor { } } + /// Shutdown the ipc and all managed child processes pub fn shutdown(&self, wait_time: Option) { if wait_time.is_some() { std::thread::sleep(wait_time.unwrap()) } diff --git a/ipc/hypervisor/src/service.rs.in b/ipc/hypervisor/src/service.rs.in index 7beffe23f..208e3efcf 100644 --- a/ipc/hypervisor/src/service.rs.in +++ b/ipc/hypervisor/src/service.rs.in @@ -60,6 +60,7 @@ impl HypervisorService { }) } + /// Add the module to the check-list pub fn add_module(&self, module_id: IpcModuleId) { self.check_list.write().unwrap().insert(module_id, false); } diff --git a/ipc/nano/src/lib.rs b/ipc/nano/src/lib.rs index 7131797aa..f5fdef8ed 100644 --- a/ipc/nano/src/lib.rs +++ b/ipc/nano/src/lib.rs @@ -23,6 +23,7 @@ extern crate jsonrpc_core; use jsonrpc_core::IoHandler; pub use ipc::{WithSocket, IpcInterface, IpcConfig}; +pub use nanomsg::Socket as NanoSocket; use std::sync::*; use std::sync::atomic::*; @@ -54,9 +55,9 @@ impl GuardedSocket where S: WithSocket { } impl Deref for GuardedSocket where S: WithSocket { - type Target = S; + type Target = Arc; - fn deref(&self) -> &S { + fn deref(&self) -> &Arc { &self.client } } diff --git a/ipc/rpc/src/binary.rs b/ipc/rpc/src/binary.rs index e9cefe642..00c5ac9e6 100644 --- a/ipc/rpc/src/binary.rs +++ b/ipc/rpc/src/binary.rs @@ -54,6 +54,7 @@ impl BinaryConvertable for Option where T: BinaryConvertable { } fn from_bytes(buffer: &[u8], length_stack: &mut VecDeque) -> Result { + if buffer.len() == 0 { return Self::from_empty_bytes(); } Ok(Some(try!(T::from_bytes(buffer, length_stack)))) } @@ -779,6 +780,42 @@ fn serialize_into_deserialize_from() { assert_eq!(v, de_v); } +#[test] +fn serialize_vec_str() { + // empty + let source = Vec::::new(); + let serialized = serialize(&source).unwrap(); + let deserialized = deserialize::>(&serialized).unwrap(); + + assert_eq!(source, deserialized); + + // with few values + let mut source = Vec::::new(); + source.push("val1".to_owned()); + source.push("val2".to_owned()); + let serialized = serialize(&source).unwrap(); + let deserialized = deserialize::>(&serialized).unwrap(); + + assert_eq!(source, deserialized); +} + +#[test] +fn serialize_opt_str() { + // none + let source: Option = None; + let serialized = serialize(&source).unwrap(); + let deserialized = deserialize::>(&serialized).unwrap(); + + assert_eq!(source, deserialized); + + // value + let source: Option = Some("i have value".to_owned()); + let serialized = serialize(&source).unwrap(); + let deserialized = deserialize::>(&serialized).unwrap(); + + assert_eq!(source, deserialized); +} + #[test] fn serialize_opt_vec() { use std::io::Cursor; diff --git a/logger/Cargo.toml b/logger/Cargo.toml new file mode 100644 index 000000000..c8a491ec3 --- /dev/null +++ b/logger/Cargo.toml @@ -0,0 +1,19 @@ +[package] +description = "Ethcore client." +name = "ethcore-logger" +version = "1.3.0" +license = "GPL-3.0" +authors = ["Ethcore "] + +[dependencies] +log = "0.3" +env_logger = "0.3" +ethcore-util = { path = "../util" } +isatty = "0.1" +lazy_static = "0.2" +regex = "0.1" +time = "0.1" + +[profile.release] +debug = true +lto = false diff --git a/parity/setup_log.rs b/logger/src/lib.rs similarity index 73% rename from parity/setup_log.rs rename to logger/src/lib.rs index fc88a6745..a72ef8755 100644 --- a/parity/setup_log.rs +++ b/logger/src/lib.rs @@ -14,20 +14,61 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . +//! Logger for parity executables + +extern crate ethcore_util as util; +#[macro_use] +extern crate log as rlog; +extern crate isatty; +extern crate regex; +extern crate env_logger; +extern crate time; +#[macro_use] +extern crate lazy_static; use std::env; use std::sync::Arc; use std::fs::File; use std::io::Write; use isatty::{stderr_isatty}; -use time; use env_logger::LogBuilder; use regex::Regex; use util::RotatingLogger; use util::log::Colour; +pub struct Settings { + pub color: bool, + pub init: Option, + pub file: Option, +} + +impl Settings { + pub fn new() -> Settings { + Settings { + color: true, + init: None, + file: None, + } + } + + pub fn init(mut self, init: String) -> Settings { + self.init = Some(init); + self + } + + pub fn file(mut self, file: String) -> Settings { + self.file = Some(file); + self + } + + pub fn no_color(mut self) -> Settings { + self.color = false; + self + } +} + /// Sets up the logger -pub fn setup_log(init: &Option, enable_color: bool, log_to_file: &Option) -> Arc { +pub fn setup_log(settings: &Settings) -> Arc { use rlog::*; let mut levels = String::new(); @@ -43,15 +84,15 @@ pub fn setup_log(init: &Option, enable_color: bool, log_to_file: &Option builder.parse(lvl); } - if let Some(ref s) = *init { + if let Some(ref s) = settings.init { levels.push_str(s); builder.parse(s); } - let enable_color = enable_color && stderr_isatty(); + let enable_color = settings.color && stderr_isatty(); let logs = Arc::new(RotatingLogger::new(levels)); let logger = logs.clone(); - let maybe_file = log_to_file.as_ref().map(|f| File::create(f).unwrap_or_else(|_| die!("Cannot write to log file given: {}", f))); + let maybe_file = settings.file.as_ref().map(|f| File::create(f).unwrap_or_else(|_| panic!("Cannot write to log file given: {}", f))); let format = move |record: &LogRecord| { let timestamp = time::strftime("%Y-%m-%d %H:%M:%S %Z", &time::now()).unwrap(); diff --git a/parity/configuration.rs b/parity/configuration.rs index 6094ba01b..af28a8665 100644 --- a/parity/configuration.rs +++ b/parity/configuration.rs @@ -34,6 +34,7 @@ use ethcore::ethereum; use ethcore::spec::Spec; use ethsync::SyncConfig; use rpc::IpcConfiguration; +use ethcore_logger::Settings as LogSettings; pub struct Configuration { pub args: Args @@ -564,6 +565,20 @@ impl Configuration { (self.args.flag_unlock.is_none() && !self.args.flag_no_signer) || self.args.flag_force_signer } + + pub fn log_settings(&self) -> LogSettings { + let mut settings = LogSettings::new(); + if self.args.flag_no_color || cfg!(windows) { + settings = settings.no_color(); + } + if let Some(ref init) = self.args.flag_logging { + settings = settings.init(init.to_owned()) + } + if let Some(ref file) = self.args.flag_log_file { + settings = settings.file(file.to_owned()) + } + settings + } } #[cfg(test)] diff --git a/parity/main.rs b/parity/main.rs index 94db419cf..b61a35247 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -53,15 +53,16 @@ extern crate ansi_term; #[macro_use] extern crate lazy_static; extern crate regex; +extern crate ethcore_logger; extern crate isatty; #[cfg(feature = "dapps")] extern crate ethcore_dapps; + #[macro_use] mod die; mod upgrade; -mod setup_log; mod rpc; mod dapps; mod informant; @@ -86,7 +87,7 @@ use rustc_serialize::hex::FromHex; use ctrlc::CtrlC; use util::{H256, ToPretty, PayloadInfo, Bytes, Colour, version, journaldb, RotatingLogger}; use util::panics::{MayPanic, ForwardPanic, PanicHandler}; -use ethcore::client::{BlockID, BlockChainClient, ClientConfig, get_db_path, BlockImportError, Mode, ChainNotify}; +use ethcore::client::{BlockID, BlockChainClient, ClientConfig, get_db_path, BlockImportError, Mode}; use ethcore::error::{ImportError}; use ethcore::service::ClientService; use ethcore::spec::Spec; @@ -95,6 +96,9 @@ use ethcore::miner::{Miner, MinerService, ExternalMiner}; use migration::migrate; use informant::Informant; use util::{Mutex, Condvar}; +use ethcore_logger::setup_log; +#[cfg(feature="ipc")] +use ethcore::client::ChainNotify; use die::*; use cli::print_version; @@ -132,7 +136,7 @@ fn execute(conf: Configuration) { // Setup panic handler let panic_handler = PanicHandler::new_in_arc(); // Setup logging - let logger = setup_log::setup_log(&conf.args.flag_logging, conf.have_color(), &conf.args.flag_log_file); + let logger = setup_log(&conf.log_settings()); // Raise fdlimit unsafe { ::fdlimit::raise_fd_limit(); } @@ -192,6 +196,8 @@ fn execute_upgrades(conf: &Configuration, spec: &Spec, client_config: &ClientCon } fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig, panic_handler: Arc, logger: Arc) { + let mut hypervisor = modules::hypervisor(); + info!("Starting {}", Colour::White.bold().paint(format!("{}", version()))); info!("Using state DB journalling strategy {}", Colour::White.bold().paint(match client_config.pruning { journaldb::Algorithm::Archive => "archive", @@ -244,9 +250,10 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig, // Sync let (sync_provider, manage_network, chain_notify) = - modules::sync(sync_config, NetworkConfiguration::from(net_settings), client.clone()) + modules::sync(&mut hypervisor, sync_config, NetworkConfiguration::from(net_settings), client.clone(), &conf.log_settings()) .unwrap_or_else(|e| die_with_error("Sync", e)); - service.add_notify(&chain_notify); + + service.add_notify(chain_notify.clone()); // if network is active by default if match conf.mode() { Mode::Dark(..) => false, _ => !conf.args.flag_no_network } { @@ -311,8 +318,7 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig, }); let informant = Arc::new(Informant::new(service.client(), Some(sync_provider.clone()), Some(manage_network.clone()), conf.have_color())); - let info_notify: Arc = informant.clone(); - service.add_notify(&info_notify); + service.add_notify(informant.clone()); // Register IO handler let io_handler = Arc::new(ClientIoHandler { client: service.client(), diff --git a/parity/modules.rs b/parity/modules.rs index 75a15e913..f7b14dd54 100644 --- a/parity/modules.rs +++ b/parity/modules.rs @@ -14,28 +14,116 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use ethsync::{EthSync, SyncProvider, ManageNetwork, SyncConfig, NetworkConfiguration}; use std::sync::Arc; -use ethcore::client::{ChainNotify, BlockChainClient}; +use ethcore::client::BlockChainClient; use ethcore; +use hypervisor::Hypervisor; +use ethsync::{SyncConfig, NetworkConfiguration}; +#[cfg(not(feature="ipc"))] +use self::no_ipc_deps::*; +#[cfg(feature="ipc")] +use self::ipc_deps::*; -pub type Modules = (Arc, Arc, Arc); +use ethcore_logger::Settings as LogSettings; + +#[cfg(not(feature="ipc"))] +mod no_ipc_deps { + pub use ethsync::{EthSync, SyncProvider, ManageNetwork}; + pub use ethcore::client::ChainNotify; +} #[cfg(feature="ipc")] -pub fn sync( - sync_cfg: SyncConfig, - net_cfg: NetworkConfiguration, - client: Arc) - -> Result -{ +pub type SyncModules = ( + GuardedSocket>, + GuardedSocket>, + GuardedSocket> +); + +#[cfg(not(feature="ipc"))] +pub type SyncModules = (Arc, Arc, Arc); + +#[cfg(feature="ipc")] +mod ipc_deps { + pub use ethsync::{SyncClient, NetworkManagerClient, ServiceConfiguration}; + pub use ethcore::client::ChainNotifyClient; + pub use hypervisor::{SYNC_MODULE_ID, BootArgs}; + pub use nanoipc::{GuardedSocket, NanoSocket, init_client}; + pub use ipc::IpcSocket; + pub use ipc::binary::serialize; +} + + +#[cfg(feature="ipc")] +pub fn hypervisor() -> Option { + Some(Hypervisor::new()) } #[cfg(not(feature="ipc"))] -pub fn sync( - sync_cfg: SyncConfig, - net_cfg: NetworkConfiguration, - client: Arc) - -> Result +pub fn hypervisor() -> Option { + None +} + +#[cfg(feature="ipc")] +fn sync_arguments(sync_cfg: SyncConfig, net_cfg: NetworkConfiguration, log_settings: &LogSettings) -> BootArgs { + let service_config = ServiceConfiguration { + sync: sync_cfg, + net: net_cfg, + }; + + // initialisation payload is passed via stdin + let service_payload = serialize(&service_config).expect("Any binary-derived struct is serializable by definition"); + + // client service url and logging settings are passed in command line + let mut cli_args = Vec::new(); + cli_args.push("ipc:///tmp/parity-chain.ipc".to_owned()); + if !log_settings.color { cli_args.push("--no-color".to_owned()); } + if let Some(ref init) = log_settings.init { + cli_args.push("-l".to_owned()); + cli_args.push(init.to_owned()); + } + if let Some(ref file) = log_settings.file { + cli_args.push("--log-file".to_owned()); + cli_args.push(file.to_owned()); + } + + BootArgs::new().stdin(service_payload).cli(cli_args) +} + +#[cfg(feature="ipc")] +pub fn sync + ( + hypervisor_ref: &mut Option, + sync_cfg: SyncConfig, + net_cfg: NetworkConfiguration, + _client: Arc, + log_settings: &LogSettings, + ) + -> Result +{ + let mut hypervisor = hypervisor_ref.take().expect("There should be hypervisor for ipc configuration"); + hypervisor = hypervisor.module(SYNC_MODULE_ID, "sync", sync_arguments(sync_cfg, net_cfg, log_settings)); + + hypervisor.start(); + hypervisor.wait_for_startup(); + + let sync_client = init_client::>("ipc:///tmp/parity-sync.ipc").unwrap(); + let notify_client = init_client::>("ipc:///tmp/parity-sync-notify.ipc").unwrap(); + let manage_client = init_client::>("ipc:///tmp/parity-manage-net.ipc").unwrap(); + + *hypervisor_ref = Some(hypervisor); + Ok((sync_client, manage_client, notify_client)) +} + +#[cfg(not(feature="ipc"))] +pub fn sync + ( + _hypervisor: &mut Option, + sync_cfg: SyncConfig, + net_cfg: NetworkConfiguration, + client: Arc, + _log_settings: &LogSettings, + ) + -> Result { let eth_sync = try!(EthSync::new(sync_cfg, client, net_cfg).map_err(ethcore::error::Error::Util)); Ok((eth_sync.clone() as Arc, eth_sync.clone() as Arc, eth_sync.clone() as Arc)) diff --git a/parity/sync/main.rs b/parity/sync/main.rs index 162f6cec1..272248785 100644 --- a/parity/sync/main.rs +++ b/parity/sync/main.rs @@ -26,6 +26,7 @@ extern crate rustc_serialize; extern crate docopt; extern crate ethcore; extern crate ethcore_util as util; +extern crate ethcore_logger; use std::sync::Arc; use hypervisor::{HypervisorServiceClient, SYNC_MODULE_ID, HYPERVISOR_IPC_URL}; @@ -33,63 +34,47 @@ use ctrlc::CtrlC; use std::sync::atomic::{AtomicBool, Ordering}; use docopt::Docopt; use ethcore::client::{RemoteClient, ChainNotify}; -use ethsync::{SyncProvider, SyncConfig, EthSync, ManageNetwork, NetworkConfiguration}; +use ethsync::{SyncProvider, EthSync, ManageNetwork, ServiceConfiguration}; use std::thread; -use util::numbers::{U256, H256}; -use std::str::FromStr; use nanoipc::IpcInterface; -use util::sha3::Hashable; + +use ethcore_logger::Settings as LogSettings; +use ethcore_logger::setup_log; const USAGE: &'static str = " Ethcore sync service Usage: - sync [options] + sync [options] -Options: - --public-address IP Public address. - --boot-nodes LIST List of boot nodes. - --reserved-nodes LIST List of reserved peers, - --secret HEX Use node key hash - --udp-port UDP port + Options: + -l --logging LOGGING Specify the logging level. Must conform to the same + format as RUST_LOG. + --log-file FILENAME Specify a filename into which logging should be + directed. + --no-color Don't use terminal color codes in output. "; #[derive(Debug, RustcDecodable)] struct Args { - arg_network_id: String, - arg_listen_address: String, - arg_nat_enabled: bool, - arg_discovery_enabled: bool, - arg_ideal_peers: u32, - arg_config_path: String, arg_client_url: String, - arg_allow_non_reserved: bool, - flag_public_address: Option, - flag_secret: Option, - flag_boot_nodes: Vec, - flag_reserved_nodes: Vec, - flag_udp_port: Option, + flag_logging: Option, + flag_log_file: Option, + flag_no_color: bool, } impl Args { - pub fn into_config(self) -> (SyncConfig, NetworkConfiguration, String) { - let mut sync_config = SyncConfig::default(); - sync_config.network_id = U256::from_str(&self.arg_network_id).unwrap(); - - let network_config = NetworkConfiguration { - udp_port: self.flag_udp_port, - nat_enabled: self.arg_nat_enabled, - boot_nodes: self.flag_boot_nodes, - listen_address: Some(self.arg_listen_address), - public_address: self.flag_public_address, - use_secret: self.flag_secret.as_ref().map(|s| H256::from_str(s).unwrap_or_else(|_| s.sha3())), - discovery_enabled: self.arg_discovery_enabled, - ideal_peers: self.arg_ideal_peers, - config_path: Some(self.arg_config_path), - reserved_nodes: self.flag_reserved_nodes, - allow_non_reserved: self.arg_allow_non_reserved, - }; - - (sync_config, network_config, self.arg_client_url) + pub fn log_settings(&self) -> LogSettings { + let mut settings = LogSettings::new(); + if self.flag_no_color || cfg!(windows) { + settings = settings.no_color(); + } + if let Some(ref init) = self.flag_logging { + settings = settings.init(init.to_owned()) + } + if let Some(ref file) = self.flag_log_file { + settings = settings.file(file.to_owned()) + } + settings } } @@ -106,16 +91,24 @@ fn run_service(addr: &str, stop_guard: Arc>(&client_url).unwrap(); + + setup_log(&args.log_settings()); + + let mut buffer = Vec::new(); + io::stdin().read_to_end(&mut buffer).expect("Failed to read initialisation payload"); + let service_config = ipc::binary::deserialize::(&buffer).expect("Failed deserializing initialisation payload"); + + let remote_client = nanoipc::init_client::>(&args.arg_client_url).unwrap(); remote_client.handshake().unwrap(); let stop = Arc::new(AtomicBool::new(false)); - let sync = EthSync::new(sync_config, remote_client.service().clone(), network_config).unwrap(); + let sync = EthSync::new(service_config.sync, remote_client.service().clone(), service_config.net).unwrap(); run_service("ipc:///tmp/parity-sync.ipc", stop.clone(), sync.clone() as Arc); run_service("ipc:///tmp/parity-manage-net.ipc", stop.clone(), sync.clone() as Arc); diff --git a/sync/src/api.rs b/sync/src/api.rs index 22b3d05a1..99429c232 100644 --- a/sync/src/api.rs +++ b/sync/src/api.rs @@ -32,6 +32,7 @@ use parking_lot::RwLock; pub const ETH_PROTOCOL: &'static str = "eth"; /// Sync configuration +#[derive(Debug, Clone)] pub struct SyncConfig { /// Max blocks to download ahead pub max_download_ahead_blocks: usize, @@ -272,3 +273,9 @@ impl From for NetworkConfiguration { } } } + +#[derive(Debug, Binary, Clone)] +pub struct ServiceConfiguration { + pub sync: SyncConfig, + pub net: NetworkConfiguration, +} diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 0c164ee6a..29a07393c 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -87,6 +87,7 @@ mod api { include!(concat!(env!("OUT_DIR"), "/api.ipc.rs")); } -pub use api::{EthSync, SyncProvider, ManageNetwork, SyncConfig, NetworkConfiguration}; +pub use api::{EthSync, SyncProvider, SyncClient, NetworkManagerClient, ManageNetwork, SyncConfig, + NetworkConfiguration, ServiceConfiguration}; pub use chain::{SyncStatus, SyncState}; diff --git a/sync/src/tests/mod.rs b/sync/src/tests/mod.rs index 496c6ab33..5afda05f0 100644 --- a/sync/src/tests/mod.rs +++ b/sync/src/tests/mod.rs @@ -15,4 +15,5 @@ // along with Parity. If not, see . pub mod helpers; -mod chain; \ No newline at end of file +mod chain; +mod rpc; diff --git a/sync/src/tests/rpc.rs b/sync/src/tests/rpc.rs new file mode 100644 index 000000000..05f4f242e --- /dev/null +++ b/sync/src/tests/rpc.rs @@ -0,0 +1,29 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use super::super::NetworkConfiguration; +use util::NetworkConfiguration as BasicNetworkConfiguration; +use std::convert::From; +use ipc::binary::{serialize, deserialize}; + +#[test] +fn network_settings_serialize() { + let net_cfg = NetworkConfiguration::from(BasicNetworkConfiguration::new_local()); + let serialized = serialize(&net_cfg).unwrap(); + let deserialized = deserialize::(&serialized).unwrap(); + + assert_eq!(net_cfg.udp_port, deserialized.udp_port); +}