From d6311624405b151ed817501d9dd6df8c27bac65d Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Wed, 24 Aug 2016 20:35:38 +0400 Subject: [PATCH] Stratum IPC service (#1959) * boot binaries reorg & helpers * stratum ipc service * spaces in cli --- Cargo.lock | 4 ++ Cargo.toml | 1 + parity/boot.rs | 123 ++++++++++++++++++++++++++++++++++++++++++ parity/main.rs | 40 ++++++++++++++ parity/modules.rs | 5 ++ parity/stratum.rs | 57 ++++++++++++++++++++ parity/sync.rs | 97 ++++++--------------------------- stratum/Cargo.toml | 7 +++ stratum/build.rs | 21 ++++++++ stratum/src/lib.rs | 17 ++++-- stratum/src/traits.rs | 21 ++++++-- 11 files changed, 304 insertions(+), 89 deletions(-) create mode 100644 parity/boot.rs create mode 100644 parity/stratum.rs create mode 100644 stratum/build.rs diff --git a/Cargo.lock b/Cargo.lock index 5bd5e8f32..5c06c5a62 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -472,12 +472,16 @@ version = "1.4.0" dependencies = [ "env_logger 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "ethcore-devtools 1.4.0", + "ethcore-ipc 1.4.0", + "ethcore-ipc-codegen 1.4.0", + "ethcore-ipc-nano 1.4.0", "ethcore-util 1.4.0", "json-tcp-server 0.1.0 (git+https://github.com/ethcore/json-tcp-server)", "jsonrpc-core 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "mio 0.5.1 (git+https://github.com/ethcore/mio?branch=v0.5.x)", + "semver 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 6ef8bcc7e..661544465 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,6 +61,7 @@ dapps = ["ethcore-dapps"] ipc = ["ethcore/ipc"] dev = ["clippy", "ethcore/dev", "ethcore-util/dev", "ethsync/dev", "ethcore-rpc/dev", "ethcore-dapps/dev", "ethcore-signer/dev"] json-tests = ["ethcore/json-tests"] +stratum = ["ipc"] [[bin]] path = "parity/main.rs" diff --git a/parity/boot.rs b/parity/boot.rs new file mode 100644 index 000000000..ddc05437c --- /dev/null +++ b/parity/boot.rs @@ -0,0 +1,123 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Parity micro-service helpers + +use nanoipc; +use ipc; +use std; +use std::sync::Arc; +use hypervisor::HypervisorServiceClient; +use hypervisor::service::IpcModuleId; +use ctrlc::CtrlC; +use std::sync::atomic::{AtomicBool, Ordering}; +use nanoipc::{IpcInterface, GuardedSocket, NanoSocket}; +use ipc::WithSocket; +use ethcore_logger::{Config as LogConfig, setup_log}; +use docopt::Docopt; + +#[derive(Debug)] +pub enum BootError { + ReadArgs(std::io::Error), + DecodeArgs(ipc::binary::BinaryError), + DependencyConnect(nanoipc::SocketError), +} + +pub fn host_service(addr: &str, stop_guard: Arc, service: Arc) where T: IpcInterface { + let socket_url = addr.to_owned(); + std::thread::spawn(move || { + let mut worker = nanoipc::Worker::::new(&service); + worker.add_reqrep(&socket_url).unwrap(); + + while !stop_guard.load(Ordering::Relaxed) { + worker.poll(); + } + }); +} + +pub fn payload() -> Result { + use std::io; + use std::io::Read; + + let mut buffer = Vec::new(); + try!( + io::stdin().read_to_end(&mut buffer) + .map_err(|io_err| BootError::ReadArgs(io_err)) + ); + + ipc::binary::deserialize::(&buffer) + .map_err(|binary_error| BootError::DecodeArgs(binary_error)) +} + +pub fn register(hv_url: &str, module_id: IpcModuleId) -> GuardedSocket>{ + let hypervisor_client = nanoipc::init_client::>(hv_url).unwrap(); + hypervisor_client.handshake().unwrap(); + hypervisor_client.module_ready(module_id); + + hypervisor_client +} + +pub fn dependency>(url: &str) + -> Result, BootError> +{ + nanoipc::init_client::(url).map_err(|socket_err| BootError::DependencyConnect(socket_err)) +} + +pub fn main_thread() -> Arc { + let stop = Arc::new(AtomicBool::new(false)); + let ctrc_stop = stop.clone(); + CtrlC::set_handler(move || { + ctrc_stop.store(true, Ordering::Relaxed); + }); + stop +} + +pub fn setup_cli_logger(svc_name: &str) { + let usage = format!(" +Ethcore {} service +Usage: + parity {} [options] + + Options: + -l --logging LOGGING Specify the logging level. Must conform to the same + format as RUST_LOG. + --log-file FILENAME Specify a filename into which logging should be + directed. + --no-color Don't use terminal color codes in output. +", svc_name, svc_name); + + #[derive(Debug, RustcDecodable)] + struct Args { + flag_logging: Option, + flag_log_file: Option, + flag_no_color: bool, + } + + impl Args { + pub fn log_settings(&self) -> LogConfig { + LogConfig { + color: self.flag_no_color || cfg!(windows), + mode: self.flag_logging.clone(), + file: self.flag_log_file.clone(), + } + } + } + + let args: Args = Docopt::new(usage) + .and_then(|d| d.decode()) + .unwrap_or_else(|e| e.exit()); + setup_log(&args.log_settings()).expect("Log initialization failure"); +} diff --git a/parity/main.rs b/parity/main.rs index bb9f5e743..406645f06 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -57,9 +57,24 @@ extern crate lazy_static; extern crate regex; extern crate isatty; +#[cfg(feature="stratum")] +extern crate ethcore_stratum; + #[cfg(feature = "dapps")] extern crate ethcore_dapps; +macro_rules! dependency { + ($dep_ty:ident, $url:expr) => { + { + let dep = boot::dependency::<$dep_ty<_>>($url) + .unwrap_or_else(|e| panic!("Fatal: error connecting service ({:?})", e)); + dep.handshake() + .unwrap_or_else(|e| panic!("Fatal: error in connected service ({:?})", e)); + dep + } + } +} + mod cache; mod upgrade; mod rpc; @@ -83,6 +98,10 @@ mod presale; mod run; mod sync; mod snapshot; +mod boot; + +#[cfg(feature="stratum")] +mod stratum; use std::{process, env}; use cli::print_version; @@ -116,6 +135,25 @@ fn start() -> Result { execute(cmd) } +#[cfg(feature="stratum")] +mod stratum_optional { + pub fn probably_run() -> bool { + // just redirect to the stratum::main() + if ::std::env::args().nth(1).map_or(false, |arg| arg == "stratum") { + super::stratum::main(); + true + } + else { false } + } +} + +#[cfg(not(feature="stratum"))] +mod stratum_optional { + pub fn probably_run() -> bool { + false + } +} + fn main() { // just redirect to the sync::main() if std::env::args().nth(1).map_or(false, |arg| arg == "sync") { @@ -123,6 +161,8 @@ fn main() { return; } + if stratum_optional::probably_run() { return; } + match start() { Ok(result) => { println!("{}", result); diff --git a/parity/modules.rs b/parity/modules.rs index 20f2567ce..83ae44802 100644 --- a/parity/modules.rs +++ b/parity/modules.rs @@ -32,6 +32,11 @@ pub mod service_urls { pub const SYNC: &'static str = "parity-sync.ipc"; pub const SYNC_NOTIFY: &'static str = "parity-sync-notify.ipc"; pub const NETWORK_MANAGER: &'static str = "parity-manage-net.ipc"; + #[cfg(feature="stratum")] + pub const STRATUM: &'static str = "parity-stratum.ipc"; + #[cfg(feature="stratum")] + pub const MINING_JOB_DISPATCHER: &'static str = "parity-mining-jobs.ipc"; + pub fn with_base(data_dir: &str, service_path: &str) -> String { let mut path = PathBuf::from(data_dir); diff --git a/parity/stratum.rs b/parity/stratum.rs new file mode 100644 index 000000000..32c7b8a50 --- /dev/null +++ b/parity/stratum.rs @@ -0,0 +1,57 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Parity sync service + +use std; +use std::sync::Arc; +use ethcore_stratum::{Stratum as StratumServer, PushWorkHandler, RemoteJobDispatcher, ServiceConfiguration}; +use std::thread; +use modules::service_urls; +use boot; +use hypervisor::service::IpcModuleId; +use std::net::SocketAddr; +use std::str::FromStr; + +const STRATUM_MODULE_ID: IpcModuleId = 8000; + +pub fn main() { + boot::setup_cli_logger("stratum"); + + let service_config: ServiceConfiguration = boot::payload() + .unwrap_or_else(|e| panic!("Fatal: error reading boot arguments ({:?})", e)); + + let job_dispatcher = dependency!(RemoteJobDispatcher, service_urls::MINING_JOB_DISPATCHER); + + let stop = boot::main_thread(); + let server = + StratumServer::start( + &SocketAddr::from_str(&service_config.listen_addr) + .unwrap_or_else(|e| panic!("Fatal: invalid listen address ({:?})", e)), + job_dispatcher.service().clone(), + service_config.secret + ).unwrap_or_else( + |e| panic!("Fatal: cannot start stratum server({:?})", e) + ); + + boot::host_service(service_urls::STRATUM, stop.clone(), server.clone() as Arc); + + let _ = boot::register(STRATUM_MODULE_ID); + + while !stop.load(::std::sync::atomic::Ordering::Relaxed) { + thread::park_timeout(std::time::Duration::from_millis(1000)); + } +} diff --git a/parity/sync.rs b/parity/sync.rs index 382c1806d..5d3056acd 100644 --- a/parity/sync.rs +++ b/parity/sync.rs @@ -16,113 +16,48 @@ //! Parity sync service -use nanoipc; -use ipc; use std; use std::sync::Arc; -use hypervisor::{HypervisorServiceClient, SYNC_MODULE_ID, HYPERVISOR_IPC_URL}; -use ctrlc::CtrlC; -use std::sync::atomic::{AtomicBool, Ordering}; -use docopt::Docopt; +use hypervisor::{SYNC_MODULE_ID, HYPERVISOR_IPC_URL}; use ethcore::client::{RemoteClient, ChainNotify}; use ethsync::{SyncProvider, EthSync, ManageNetwork, ServiceConfiguration}; use std::thread; -use nanoipc::IpcInterface; use modules::service_urls; -use ethcore_logger::{Config as LogConfig, setup_log}; - -const USAGE: &'static str = " -Ethcore sync service -Usage: - parity sync [options] - - Options: - -l --logging LOGGING Specify the logging level. Must conform to the same - format as RUST_LOG. - --log-file FILENAME Specify a filename into which logging should be - directed. - --no-color Don't use terminal color codes in output. -"; - -#[derive(Debug, RustcDecodable)] -struct Args { - flag_logging: Option, - flag_log_file: Option, - flag_no_color: bool, -} - -impl Args { - pub fn log_settings(&self) -> LogConfig { - LogConfig { - color: self.flag_no_color || cfg!(windows), - mode: self.flag_logging.clone(), - file: self.flag_log_file.clone(), - } - } -} - -fn run_service(addr: &str, stop_guard: Arc, service: Arc) where T: IpcInterface { - let socket_url = addr.to_owned(); - std::thread::spawn(move || { - let mut worker = nanoipc::Worker::::new(&service); - worker.add_reqrep(&socket_url).unwrap(); - - while !stop_guard.load(Ordering::Relaxed) { - worker.poll(); - } - }); -} +use boot; pub fn main() { - use std::io::{self, Read}; + boot::setup_cli_logger("sync"); - let args: Args = Docopt::new(USAGE) - .and_then(|d| d.decode()) - .unwrap_or_else(|e| e.exit()); + let service_config: ServiceConfiguration = boot::payload() + .unwrap_or_else(|e| panic!("Fatal: error reading boot arguments ({:?})", e)); - setup_log(&args.log_settings()).expect("Log initialization failure"); + let remote_client = dependency!(RemoteClient, &service_urls::with_base(&service_config.io_path, service_urls::CLIENT)); - let mut buffer = Vec::new(); - io::stdin().read_to_end(&mut buffer).expect("Failed to read initialisation payload"); - let service_config = ipc::binary::deserialize::(&buffer).expect("Failed deserializing initialisation payload"); - - let remote_client = nanoipc::init_client::>( - &service_urls::with_base(&service_config.io_path, service_urls::CLIENT), - ).unwrap(); - - remote_client.handshake().unwrap(); - - let stop = Arc::new(AtomicBool::new(false)); + let stop = boot::main_thread(); let sync = EthSync::new(service_config.sync, remote_client.service().clone(), service_config.net).unwrap(); - run_service( + let _ = boot::register( + &service_urls::with_base(&service_config.io_path, HYPERVISOR_IPC_URL), + SYNC_MODULE_ID + ); + + boot::host_service( &service_urls::with_base(&service_config.io_path, service_urls::SYNC), stop.clone(), sync.clone() as Arc ); - run_service( + boot::host_service( &service_urls::with_base(&service_config.io_path, service_urls::NETWORK_MANAGER), stop.clone(), sync.clone() as Arc ); - run_service( + boot::host_service( &service_urls::with_base(&service_config.io_path, service_urls::SYNC_NOTIFY), stop.clone(), sync.clone() as Arc ); - let hypervisor_client = nanoipc::init_client::>( - &service_urls::with_base(&service_config.io_path, HYPERVISOR_IPC_URL), - ).unwrap(); - hypervisor_client.handshake().unwrap(); - hypervisor_client.module_ready(SYNC_MODULE_ID); - - let terminate_stop = stop.clone(); - CtrlC::set_handler(move || { - terminate_stop.store(true, Ordering::Relaxed); - }); - - while !stop.load(Ordering::Relaxed) { + while !stop.load(::std::sync::atomic::Ordering::Relaxed) { thread::park_timeout(std::time::Duration::from_millis(1000)); } } diff --git a/stratum/Cargo.toml b/stratum/Cargo.toml index 7fc8fa6c3..958e807be 100644 --- a/stratum/Cargo.toml +++ b/stratum/Cargo.toml @@ -4,6 +4,10 @@ name = "ethcore-stratum" version = "1.4.0" license = "GPL-3.0" authors = ["Ethcore "] +build = "build.rs" + +[build-dependencies] +ethcore-ipc-codegen = { path = "../ipc/codegen" } [dependencies] log = "0.3" @@ -14,6 +18,9 @@ ethcore-util = { path = "../util" } ethcore-devtools = { path = "../devtools" } lazy_static = "0.2" env_logger = "0.3" +ethcore-ipc = { path = "../ipc/rpc" } +semver = "0.2" +ethcore-ipc-nano = { path = "../ipc/nano" } [profile.release] debug = true diff --git a/stratum/build.rs b/stratum/build.rs new file mode 100644 index 000000000..61fa5098f --- /dev/null +++ b/stratum/build.rs @@ -0,0 +1,21 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +extern crate ethcore_ipc_codegen; + +fn main() { + ethcore_ipc_codegen::derive_ipc("src/traits.rs").unwrap(); +} diff --git a/stratum/src/lib.rs b/stratum/src/lib.rs index ccbfa6b57..c54eeea62 100644 --- a/stratum/src/lib.rs +++ b/stratum/src/lib.rs @@ -20,6 +20,8 @@ extern crate json_tcp_server; extern crate jsonrpc_core; #[macro_use] extern crate log; extern crate ethcore_util as util; +extern crate ethcore_ipc as ipc; +extern crate semver; #[cfg(test)] extern crate mio; @@ -31,9 +33,16 @@ extern crate env_logger; #[macro_use] extern crate lazy_static; -mod traits; +mod traits { + //! Stratum ipc interfaces specification + #![allow(dead_code, unused_assignments, unused_variables, missing_docs)] // codegen issues + include!(concat!(env!("OUT_DIR"), "/traits.rs")); +} -pub use traits::{JobDispatcher, PushWorkHandler, Error}; +pub use traits::{ + JobDispatcher, PushWorkHandler, Error, ServiceConfiguration, + RemoteWorkHandler, RemoteJobDispatcher, +}; use json_tcp_server::Server as JsonRpcServer; use jsonrpc_core::{IoHandler, Params, IoDelegate, to_value, from_params}; @@ -133,8 +142,8 @@ impl Stratum { let mut job_que = self.job_que.write(); let workers = self.workers.read(); for socket_addr in job_que.drain() { - if let Some(ref worker_id) = workers.get(&socket_addr) { - let job_payload = self.dispatcher.job(worker_id); + if let Some(worker_id) = workers.get(&socket_addr) { + let job_payload = self.dispatcher.job(worker_id.to_owned()); job_payload.map( |json| self.rpc_server.push_message(&socket_addr, json.as_bytes()) ); diff --git a/stratum/src/traits.rs b/stratum/src/traits.rs index 8bd169ad6..339f753b5 100644 --- a/stratum/src/traits.rs +++ b/stratum/src/traits.rs @@ -14,12 +14,12 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -//! Stratum ipc interfaces specification - use std; use std::error::Error as StdError; +use util::H256; +use ipc::IpcConfig; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Binary)] pub enum Error { NoWork, NoWorkers, @@ -32,6 +32,8 @@ impl From for Error { } } +#[derive(Ipc)] +#[ipc(client_ident="RemoteJobDispatcher")] /// Interface that can provide pow/blockchain-specific responses for the clients pub trait JobDispatcher: Send + Sync { // json for initial client handshake @@ -39,9 +41,11 @@ pub trait JobDispatcher: Send + Sync { // json for difficulty dispatch fn difficulty(&self) -> Option { None } // json for job update given worker_id (payload manager should split job!) - fn job(&self, _worker_id: &str) -> Option { None } + fn job(&self, _worker_id: String) -> Option { None } } +#[derive(Ipc)] +#[ipc(client_ident="RemoteWorkHandler")] /// Interface that can handle requests to push job for workers pub trait PushWorkHandler: Send + Sync { /// push the same work package for all workers (`payload`: json of pow-specific set of work specification) @@ -50,3 +54,12 @@ pub trait PushWorkHandler: Send + Sync { /// push the work packages worker-wise (`payload`: json of pow-specific set of work specification) fn push_work(&self, payloads: Vec) -> Result<(), Error>; } + +#[derive(Binary)] +pub struct ServiceConfiguration { + pub listen_addr: String, + pub secret: Option, +} + +impl IpcConfig for PushWorkHandler { } +impl IpcConfig for JobDispatcher { }