diff --git a/Cargo.lock b/Cargo.lock
index 5bd5e8f32..5c06c5a62 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -472,12 +472,16 @@ version = "1.4.0"
dependencies = [
"env_logger 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"ethcore-devtools 1.4.0",
+ "ethcore-ipc 1.4.0",
+ "ethcore-ipc-codegen 1.4.0",
+ "ethcore-ipc-nano 1.4.0",
"ethcore-util 1.4.0",
"json-tcp-server 0.1.0 (git+https://github.com/ethcore/json-tcp-server)",
"jsonrpc-core 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"mio 0.5.1 (git+https://github.com/ethcore/mio?branch=v0.5.x)",
+ "semver 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
diff --git a/Cargo.toml b/Cargo.toml
index 6ef8bcc7e..661544465 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -61,6 +61,7 @@ dapps = ["ethcore-dapps"]
ipc = ["ethcore/ipc"]
dev = ["clippy", "ethcore/dev", "ethcore-util/dev", "ethsync/dev", "ethcore-rpc/dev", "ethcore-dapps/dev", "ethcore-signer/dev"]
json-tests = ["ethcore/json-tests"]
+stratum = ["ipc"]
[[bin]]
path = "parity/main.rs"
diff --git a/parity/boot.rs b/parity/boot.rs
new file mode 100644
index 000000000..ddc05437c
--- /dev/null
+++ b/parity/boot.rs
@@ -0,0 +1,123 @@
+// Copyright 2015, 2016 Ethcore (UK) Ltd.
+// This file is part of Parity.
+
+// Parity is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Parity is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Parity. If not, see .
+
+//! Parity micro-service helpers
+
+use nanoipc;
+use ipc;
+use std;
+use std::sync::Arc;
+use hypervisor::HypervisorServiceClient;
+use hypervisor::service::IpcModuleId;
+use ctrlc::CtrlC;
+use std::sync::atomic::{AtomicBool, Ordering};
+use nanoipc::{IpcInterface, GuardedSocket, NanoSocket};
+use ipc::WithSocket;
+use ethcore_logger::{Config as LogConfig, setup_log};
+use docopt::Docopt;
+
+#[derive(Debug)]
+pub enum BootError {
+ ReadArgs(std::io::Error),
+ DecodeArgs(ipc::binary::BinaryError),
+ DependencyConnect(nanoipc::SocketError),
+}
+
+pub fn host_service(addr: &str, stop_guard: Arc, service: Arc) where T: IpcInterface {
+ let socket_url = addr.to_owned();
+ std::thread::spawn(move || {
+ let mut worker = nanoipc::Worker::::new(&service);
+ worker.add_reqrep(&socket_url).unwrap();
+
+ while !stop_guard.load(Ordering::Relaxed) {
+ worker.poll();
+ }
+ });
+}
+
+pub fn payload() -> Result {
+ use std::io;
+ use std::io::Read;
+
+ let mut buffer = Vec::new();
+ try!(
+ io::stdin().read_to_end(&mut buffer)
+ .map_err(|io_err| BootError::ReadArgs(io_err))
+ );
+
+ ipc::binary::deserialize::(&buffer)
+ .map_err(|binary_error| BootError::DecodeArgs(binary_error))
+}
+
+pub fn register(hv_url: &str, module_id: IpcModuleId) -> GuardedSocket>{
+ let hypervisor_client = nanoipc::init_client::>(hv_url).unwrap();
+ hypervisor_client.handshake().unwrap();
+ hypervisor_client.module_ready(module_id);
+
+ hypervisor_client
+}
+
+pub fn dependency>(url: &str)
+ -> Result, BootError>
+{
+ nanoipc::init_client::(url).map_err(|socket_err| BootError::DependencyConnect(socket_err))
+}
+
+pub fn main_thread() -> Arc {
+ let stop = Arc::new(AtomicBool::new(false));
+ let ctrc_stop = stop.clone();
+ CtrlC::set_handler(move || {
+ ctrc_stop.store(true, Ordering::Relaxed);
+ });
+ stop
+}
+
+pub fn setup_cli_logger(svc_name: &str) {
+ let usage = format!("
+Ethcore {} service
+Usage:
+ parity {} [options]
+
+ Options:
+ -l --logging LOGGING Specify the logging level. Must conform to the same
+ format as RUST_LOG.
+ --log-file FILENAME Specify a filename into which logging should be
+ directed.
+ --no-color Don't use terminal color codes in output.
+", svc_name, svc_name);
+
+ #[derive(Debug, RustcDecodable)]
+ struct Args {
+ flag_logging: Option,
+ flag_log_file: Option,
+ flag_no_color: bool,
+ }
+
+ impl Args {
+ pub fn log_settings(&self) -> LogConfig {
+ LogConfig {
+ color: self.flag_no_color || cfg!(windows),
+ mode: self.flag_logging.clone(),
+ file: self.flag_log_file.clone(),
+ }
+ }
+ }
+
+ let args: Args = Docopt::new(usage)
+ .and_then(|d| d.decode())
+ .unwrap_or_else(|e| e.exit());
+ setup_log(&args.log_settings()).expect("Log initialization failure");
+}
diff --git a/parity/main.rs b/parity/main.rs
index bb9f5e743..406645f06 100644
--- a/parity/main.rs
+++ b/parity/main.rs
@@ -57,9 +57,24 @@ extern crate lazy_static;
extern crate regex;
extern crate isatty;
+#[cfg(feature="stratum")]
+extern crate ethcore_stratum;
+
#[cfg(feature = "dapps")]
extern crate ethcore_dapps;
+macro_rules! dependency {
+ ($dep_ty:ident, $url:expr) => {
+ {
+ let dep = boot::dependency::<$dep_ty<_>>($url)
+ .unwrap_or_else(|e| panic!("Fatal: error connecting service ({:?})", e));
+ dep.handshake()
+ .unwrap_or_else(|e| panic!("Fatal: error in connected service ({:?})", e));
+ dep
+ }
+ }
+}
+
mod cache;
mod upgrade;
mod rpc;
@@ -83,6 +98,10 @@ mod presale;
mod run;
mod sync;
mod snapshot;
+mod boot;
+
+#[cfg(feature="stratum")]
+mod stratum;
use std::{process, env};
use cli::print_version;
@@ -116,6 +135,25 @@ fn start() -> Result {
execute(cmd)
}
+#[cfg(feature="stratum")]
+mod stratum_optional {
+ pub fn probably_run() -> bool {
+ // just redirect to the stratum::main()
+ if ::std::env::args().nth(1).map_or(false, |arg| arg == "stratum") {
+ super::stratum::main();
+ true
+ }
+ else { false }
+ }
+}
+
+#[cfg(not(feature="stratum"))]
+mod stratum_optional {
+ pub fn probably_run() -> bool {
+ false
+ }
+}
+
fn main() {
// just redirect to the sync::main()
if std::env::args().nth(1).map_or(false, |arg| arg == "sync") {
@@ -123,6 +161,8 @@ fn main() {
return;
}
+ if stratum_optional::probably_run() { return; }
+
match start() {
Ok(result) => {
println!("{}", result);
diff --git a/parity/modules.rs b/parity/modules.rs
index 20f2567ce..83ae44802 100644
--- a/parity/modules.rs
+++ b/parity/modules.rs
@@ -32,6 +32,11 @@ pub mod service_urls {
pub const SYNC: &'static str = "parity-sync.ipc";
pub const SYNC_NOTIFY: &'static str = "parity-sync-notify.ipc";
pub const NETWORK_MANAGER: &'static str = "parity-manage-net.ipc";
+ #[cfg(feature="stratum")]
+ pub const STRATUM: &'static str = "parity-stratum.ipc";
+ #[cfg(feature="stratum")]
+ pub const MINING_JOB_DISPATCHER: &'static str = "parity-mining-jobs.ipc";
+
pub fn with_base(data_dir: &str, service_path: &str) -> String {
let mut path = PathBuf::from(data_dir);
diff --git a/parity/stratum.rs b/parity/stratum.rs
new file mode 100644
index 000000000..32c7b8a50
--- /dev/null
+++ b/parity/stratum.rs
@@ -0,0 +1,57 @@
+// Copyright 2015, 2016 Ethcore (UK) Ltd.
+// This file is part of Parity.
+
+// Parity is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Parity is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Parity. If not, see .
+
+//! Parity sync service
+
+use std;
+use std::sync::Arc;
+use ethcore_stratum::{Stratum as StratumServer, PushWorkHandler, RemoteJobDispatcher, ServiceConfiguration};
+use std::thread;
+use modules::service_urls;
+use boot;
+use hypervisor::service::IpcModuleId;
+use std::net::SocketAddr;
+use std::str::FromStr;
+
+const STRATUM_MODULE_ID: IpcModuleId = 8000;
+
+pub fn main() {
+ boot::setup_cli_logger("stratum");
+
+ let service_config: ServiceConfiguration = boot::payload()
+ .unwrap_or_else(|e| panic!("Fatal: error reading boot arguments ({:?})", e));
+
+ let job_dispatcher = dependency!(RemoteJobDispatcher, service_urls::MINING_JOB_DISPATCHER);
+
+ let stop = boot::main_thread();
+ let server =
+ StratumServer::start(
+ &SocketAddr::from_str(&service_config.listen_addr)
+ .unwrap_or_else(|e| panic!("Fatal: invalid listen address ({:?})", e)),
+ job_dispatcher.service().clone(),
+ service_config.secret
+ ).unwrap_or_else(
+ |e| panic!("Fatal: cannot start stratum server({:?})", e)
+ );
+
+ boot::host_service(service_urls::STRATUM, stop.clone(), server.clone() as Arc);
+
+ let _ = boot::register(STRATUM_MODULE_ID);
+
+ while !stop.load(::std::sync::atomic::Ordering::Relaxed) {
+ thread::park_timeout(std::time::Duration::from_millis(1000));
+ }
+}
diff --git a/parity/sync.rs b/parity/sync.rs
index 382c1806d..5d3056acd 100644
--- a/parity/sync.rs
+++ b/parity/sync.rs
@@ -16,113 +16,48 @@
//! Parity sync service
-use nanoipc;
-use ipc;
use std;
use std::sync::Arc;
-use hypervisor::{HypervisorServiceClient, SYNC_MODULE_ID, HYPERVISOR_IPC_URL};
-use ctrlc::CtrlC;
-use std::sync::atomic::{AtomicBool, Ordering};
-use docopt::Docopt;
+use hypervisor::{SYNC_MODULE_ID, HYPERVISOR_IPC_URL};
use ethcore::client::{RemoteClient, ChainNotify};
use ethsync::{SyncProvider, EthSync, ManageNetwork, ServiceConfiguration};
use std::thread;
-use nanoipc::IpcInterface;
use modules::service_urls;
-use ethcore_logger::{Config as LogConfig, setup_log};
-
-const USAGE: &'static str = "
-Ethcore sync service
-Usage:
- parity sync [options]
-
- Options:
- -l --logging LOGGING Specify the logging level. Must conform to the same
- format as RUST_LOG.
- --log-file FILENAME Specify a filename into which logging should be
- directed.
- --no-color Don't use terminal color codes in output.
-";
-
-#[derive(Debug, RustcDecodable)]
-struct Args {
- flag_logging: Option,
- flag_log_file: Option,
- flag_no_color: bool,
-}
-
-impl Args {
- pub fn log_settings(&self) -> LogConfig {
- LogConfig {
- color: self.flag_no_color || cfg!(windows),
- mode: self.flag_logging.clone(),
- file: self.flag_log_file.clone(),
- }
- }
-}
-
-fn run_service(addr: &str, stop_guard: Arc, service: Arc) where T: IpcInterface {
- let socket_url = addr.to_owned();
- std::thread::spawn(move || {
- let mut worker = nanoipc::Worker::::new(&service);
- worker.add_reqrep(&socket_url).unwrap();
-
- while !stop_guard.load(Ordering::Relaxed) {
- worker.poll();
- }
- });
-}
+use boot;
pub fn main() {
- use std::io::{self, Read};
+ boot::setup_cli_logger("sync");
- let args: Args = Docopt::new(USAGE)
- .and_then(|d| d.decode())
- .unwrap_or_else(|e| e.exit());
+ let service_config: ServiceConfiguration = boot::payload()
+ .unwrap_or_else(|e| panic!("Fatal: error reading boot arguments ({:?})", e));
- setup_log(&args.log_settings()).expect("Log initialization failure");
+ let remote_client = dependency!(RemoteClient, &service_urls::with_base(&service_config.io_path, service_urls::CLIENT));
- let mut buffer = Vec::new();
- io::stdin().read_to_end(&mut buffer).expect("Failed to read initialisation payload");
- let service_config = ipc::binary::deserialize::(&buffer).expect("Failed deserializing initialisation payload");
-
- let remote_client = nanoipc::init_client::>(
- &service_urls::with_base(&service_config.io_path, service_urls::CLIENT),
- ).unwrap();
-
- remote_client.handshake().unwrap();
-
- let stop = Arc::new(AtomicBool::new(false));
+ let stop = boot::main_thread();
let sync = EthSync::new(service_config.sync, remote_client.service().clone(), service_config.net).unwrap();
- run_service(
+ let _ = boot::register(
+ &service_urls::with_base(&service_config.io_path, HYPERVISOR_IPC_URL),
+ SYNC_MODULE_ID
+ );
+
+ boot::host_service(
&service_urls::with_base(&service_config.io_path, service_urls::SYNC),
stop.clone(),
sync.clone() as Arc
);
- run_service(
+ boot::host_service(
&service_urls::with_base(&service_config.io_path, service_urls::NETWORK_MANAGER),
stop.clone(),
sync.clone() as Arc
);
- run_service(
+ boot::host_service(
&service_urls::with_base(&service_config.io_path, service_urls::SYNC_NOTIFY),
stop.clone(),
sync.clone() as Arc
);
- let hypervisor_client = nanoipc::init_client::>(
- &service_urls::with_base(&service_config.io_path, HYPERVISOR_IPC_URL),
- ).unwrap();
- hypervisor_client.handshake().unwrap();
- hypervisor_client.module_ready(SYNC_MODULE_ID);
-
- let terminate_stop = stop.clone();
- CtrlC::set_handler(move || {
- terminate_stop.store(true, Ordering::Relaxed);
- });
-
- while !stop.load(Ordering::Relaxed) {
+ while !stop.load(::std::sync::atomic::Ordering::Relaxed) {
thread::park_timeout(std::time::Duration::from_millis(1000));
}
}
diff --git a/stratum/Cargo.toml b/stratum/Cargo.toml
index 7fc8fa6c3..958e807be 100644
--- a/stratum/Cargo.toml
+++ b/stratum/Cargo.toml
@@ -4,6 +4,10 @@ name = "ethcore-stratum"
version = "1.4.0"
license = "GPL-3.0"
authors = ["Ethcore "]
+build = "build.rs"
+
+[build-dependencies]
+ethcore-ipc-codegen = { path = "../ipc/codegen" }
[dependencies]
log = "0.3"
@@ -14,6 +18,9 @@ ethcore-util = { path = "../util" }
ethcore-devtools = { path = "../devtools" }
lazy_static = "0.2"
env_logger = "0.3"
+ethcore-ipc = { path = "../ipc/rpc" }
+semver = "0.2"
+ethcore-ipc-nano = { path = "../ipc/nano" }
[profile.release]
debug = true
diff --git a/stratum/build.rs b/stratum/build.rs
new file mode 100644
index 000000000..61fa5098f
--- /dev/null
+++ b/stratum/build.rs
@@ -0,0 +1,21 @@
+// Copyright 2015, 2016 Ethcore (UK) Ltd.
+// This file is part of Parity.
+
+// Parity is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Parity is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Parity. If not, see .
+
+extern crate ethcore_ipc_codegen;
+
+fn main() {
+ ethcore_ipc_codegen::derive_ipc("src/traits.rs").unwrap();
+}
diff --git a/stratum/src/lib.rs b/stratum/src/lib.rs
index ccbfa6b57..c54eeea62 100644
--- a/stratum/src/lib.rs
+++ b/stratum/src/lib.rs
@@ -20,6 +20,8 @@ extern crate json_tcp_server;
extern crate jsonrpc_core;
#[macro_use] extern crate log;
extern crate ethcore_util as util;
+extern crate ethcore_ipc as ipc;
+extern crate semver;
#[cfg(test)]
extern crate mio;
@@ -31,9 +33,16 @@ extern crate env_logger;
#[macro_use]
extern crate lazy_static;
-mod traits;
+mod traits {
+ //! Stratum ipc interfaces specification
+ #![allow(dead_code, unused_assignments, unused_variables, missing_docs)] // codegen issues
+ include!(concat!(env!("OUT_DIR"), "/traits.rs"));
+}
-pub use traits::{JobDispatcher, PushWorkHandler, Error};
+pub use traits::{
+ JobDispatcher, PushWorkHandler, Error, ServiceConfiguration,
+ RemoteWorkHandler, RemoteJobDispatcher,
+};
use json_tcp_server::Server as JsonRpcServer;
use jsonrpc_core::{IoHandler, Params, IoDelegate, to_value, from_params};
@@ -133,8 +142,8 @@ impl Stratum {
let mut job_que = self.job_que.write();
let workers = self.workers.read();
for socket_addr in job_que.drain() {
- if let Some(ref worker_id) = workers.get(&socket_addr) {
- let job_payload = self.dispatcher.job(worker_id);
+ if let Some(worker_id) = workers.get(&socket_addr) {
+ let job_payload = self.dispatcher.job(worker_id.to_owned());
job_payload.map(
|json| self.rpc_server.push_message(&socket_addr, json.as_bytes())
);
diff --git a/stratum/src/traits.rs b/stratum/src/traits.rs
index 8bd169ad6..339f753b5 100644
--- a/stratum/src/traits.rs
+++ b/stratum/src/traits.rs
@@ -14,12 +14,12 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see .
-//! Stratum ipc interfaces specification
-
use std;
use std::error::Error as StdError;
+use util::H256;
+use ipc::IpcConfig;
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, Binary)]
pub enum Error {
NoWork,
NoWorkers,
@@ -32,6 +32,8 @@ impl From for Error {
}
}
+#[derive(Ipc)]
+#[ipc(client_ident="RemoteJobDispatcher")]
/// Interface that can provide pow/blockchain-specific responses for the clients
pub trait JobDispatcher: Send + Sync {
// json for initial client handshake
@@ -39,9 +41,11 @@ pub trait JobDispatcher: Send + Sync {
// json for difficulty dispatch
fn difficulty(&self) -> Option { None }
// json for job update given worker_id (payload manager should split job!)
- fn job(&self, _worker_id: &str) -> Option { None }
+ fn job(&self, _worker_id: String) -> Option { None }
}
+#[derive(Ipc)]
+#[ipc(client_ident="RemoteWorkHandler")]
/// Interface that can handle requests to push job for workers
pub trait PushWorkHandler: Send + Sync {
/// push the same work package for all workers (`payload`: json of pow-specific set of work specification)
@@ -50,3 +54,12 @@ pub trait PushWorkHandler: Send + Sync {
/// push the work packages worker-wise (`payload`: json of pow-specific set of work specification)
fn push_work(&self, payloads: Vec) -> Result<(), Error>;
}
+
+#[derive(Binary)]
+pub struct ServiceConfiguration {
+ pub listen_addr: String,
+ pub secret: Option,
+}
+
+impl IpcConfig for PushWorkHandler { }
+impl IpcConfig for JobDispatcher { }