From 070a2157e6babf88f8b0ba457441b7b09631f1b3 Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Fri, 19 Aug 2016 12:20:39 +0400 Subject: [PATCH] Stratum protocol general (#1954) * stratum stub * basic subscription * workers, authorizations stubs * push messages * authorizing workers * push tests (failing) * traits.rs forgotten * version bump * parking lot rwlock * trace for else * various fixes * fix last test * bump version * logger under test mod * dependencies dependant * extra line demolition --- Cargo.lock | 29 ++++ Cargo.toml | 1 + stratum/Cargo.toml | 20 +++ stratum/src/lib.rs | 380 ++++++++++++++++++++++++++++++++++++++++++ stratum/src/traits.rs | 52 ++++++ 5 files changed, 482 insertions(+) create mode 100644 stratum/Cargo.toml create mode 100644 stratum/src/lib.rs create mode 100644 stratum/src/traits.rs diff --git a/Cargo.lock b/Cargo.lock index b3aca5d7e..b5194c97b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -20,6 +20,7 @@ dependencies = [ "ethcore-logger 1.4.0", "ethcore-rpc 1.4.0", "ethcore-signer 1.4.0", + "ethcore-stratum 1.4.0", "ethcore-util 1.4.0", "ethsync 1.4.0", "fdlimit 0.1.0", @@ -449,6 +450,20 @@ dependencies = [ "ws 0.5.2 (git+https://github.com/ethcore/ws-rs.git?branch=mio-upstream-stable)", ] +[[package]] +name = "ethcore-stratum" +version = "1.4.0" +dependencies = [ + "env_logger 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", + "ethcore-devtools 1.4.0", + "ethcore-util 1.4.0", + "json-tcp-server 0.1.0 (git+https://github.com/ethcore/json-tcp-server)", + "jsonrpc-core 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.5.1 (git+https://github.com/ethcore/mio?branch=v0.5.x)", +] + [[package]] name = "ethcore-util" version = "1.4.0" @@ -709,6 +724,20 @@ dependencies = [ "slab 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "json-tcp-server" +version = "0.1.0" +source = "git+https://github.com/ethcore/json-tcp-server#05c186ea100e2107c1f9f83ca4c62cb6ed2c68bd" +dependencies = [ + "bytes 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "env_logger 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", + "jsonrpc-core 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", + "slab 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "jsonrpc-core" version = "2.1.1" diff --git a/Cargo.toml b/Cargo.toml index 6529bcce0..2ef6c24a9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,6 +42,7 @@ ethcore-logger = { path = "logger" } json-ipc-server = { git = "https://github.com/ethcore/json-ipc-server.git" } ethcore-dapps = { path = "dapps", optional = true } clippy = { version = "0.0.82", optional = true} +ethcore-stratum = { path = "stratum" } [target.'cfg(windows)'.dependencies] winapi = "0.2" diff --git a/stratum/Cargo.toml b/stratum/Cargo.toml new file mode 100644 index 000000000..7fc8fa6c3 --- /dev/null +++ b/stratum/Cargo.toml @@ -0,0 +1,20 @@ +[package] +description = "Ethcore stratum lib" +name = "ethcore-stratum" +version = "1.4.0" +license = "GPL-3.0" +authors = ["Ethcore "] + +[dependencies] +log = "0.3" +json-tcp-server = { git = "https://github.com/ethcore/json-tcp-server" } +jsonrpc-core = "2.1" +mio = { git = "https://github.com/ethcore/mio", branch = "v0.5.x" } +ethcore-util = { path = "../util" } +ethcore-devtools = { path = "../devtools" } +lazy_static = "0.2" +env_logger = "0.3" + +[profile.release] +debug = true +lto = false diff --git a/stratum/src/lib.rs b/stratum/src/lib.rs new file mode 100644 index 000000000..ccbfa6b57 --- /dev/null +++ b/stratum/src/lib.rs @@ -0,0 +1,380 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Stratum protocol implementation for parity ethereum/bitcoin clients + +extern crate json_tcp_server; +extern crate jsonrpc_core; +#[macro_use] extern crate log; +extern crate ethcore_util as util; + +#[cfg(test)] +extern crate mio; +#[cfg(test)] +extern crate ethcore_devtools as devtools; +#[cfg(test)] +extern crate env_logger; +#[cfg(test)] +#[macro_use] +extern crate lazy_static; + +mod traits; + +pub use traits::{JobDispatcher, PushWorkHandler, Error}; + +use json_tcp_server::Server as JsonRpcServer; +use jsonrpc_core::{IoHandler, Params, IoDelegate, to_value, from_params}; +use std::sync::Arc; + +use std::net::SocketAddr; +use std::collections::{HashSet, HashMap}; +use util::{H256, Hashable, RwLock, RwLockReadGuard}; + +pub struct Stratum { + rpc_server: JsonRpcServer, + handler: Arc, + /// Subscribed clients + subscribers: RwLock>, + /// List of workers supposed to receive job update + job_que: RwLock>, + /// Payload manager + dispatcher: Arc, + /// Authorized workers (socket - worker_id) + workers: Arc>>, + /// Secret if any + secret: Option, +} + +impl Stratum { + pub fn start( + addr: &SocketAddr, + dispatcher: Arc, + secret: Option, + ) -> Result, json_tcp_server::Error> { + let handler = Arc::new(IoHandler::new()); + let server = try!(JsonRpcServer::new(addr, &handler)); + let stratum = Arc::new(Stratum { + rpc_server: server, + handler: handler, + subscribers: RwLock::new(Vec::new()), + job_que: RwLock::new(HashSet::new()), + dispatcher: dispatcher, + workers: Arc::new(RwLock::new(HashMap::new())), + secret: secret, + }); + + let mut delegate = IoDelegate::::new(stratum.clone()); + delegate.add_method("miner.subscribe", Stratum::subscribe); + delegate.add_method("miner.authorize", Stratum::authorize); + stratum.handler.add_delegate(delegate); + + try!(stratum.rpc_server.run_async()); + + Ok(stratum) + } + + fn subscribe(&self, _params: Params) -> std::result::Result { + use std::str::FromStr; + + if let Some(context) = self.rpc_server.request_context() { + self.subscribers.write().push(context.socket_addr); + self.job_que.write().insert(context.socket_addr); + trace!(target: "stratum", "Subscription request from {:?}", context.socket_addr); + } + Ok(match self.dispatcher.initial() { + Some(initial) => match jsonrpc_core::Value::from_str(&initial) { + Ok(val) => val, + Err(e) => { + warn!(target: "stratum", "Invalid payload: '{}' ({:?})", &initial, e); + try!(to_value(&[0u8; 0])) + }, + }, + None => try!(to_value(&[0u8; 0])), + }) + } + + fn authorize(&self, params: Params) -> std::result::Result { + from_params::<(String, String)>(params).and_then(|(worker_id, secret)|{ + if let Some(valid_secret) = self.secret { + let hash = secret.sha3(); + if hash != valid_secret { + return to_value(&false); + } + } + if let Some(context) = self.rpc_server.request_context() { + self.workers.write().insert(context.socket_addr, worker_id); + to_value(&true) + } + else { + warn!(target: "stratum", "Authorize without valid context received!"); + to_value(&false) + } + }) + } + + pub fn subscribers(&self) -> RwLockReadGuard> { + self.subscribers.read() + } + + pub fn maintain(&self) { + let mut job_que = self.job_que.write(); + let workers = self.workers.read(); + for socket_addr in job_que.drain() { + if let Some(ref worker_id) = workers.get(&socket_addr) { + let job_payload = self.dispatcher.job(worker_id); + job_payload.map( + |json| self.rpc_server.push_message(&socket_addr, json.as_bytes()) + ); + } + else { + trace!( + target: "stratum", + "Job queued for worker that is still not authorized, skipping ('{:?}')", socket_addr + ); + } + } + } +} + +impl PushWorkHandler for Stratum { + fn push_work_all(&self, payload: String) -> Result<(), Error> { + let workers = self.workers.read(); + println!("pushing work for {} workers", workers.len()); + for (ref addr, _) in workers.iter() { + try!(self.rpc_server.push_message(addr, payload.as_bytes())); + } + Ok(()) + } + + fn push_work(&self, payloads: Vec) -> Result<(), Error> { + if !payloads.len() > 0 { + return Err(Error::NoWork); + } + let workers = self.workers.read(); + let addrs = workers.keys().collect::>(); + if !workers.len() > 0 { + return Err(Error::NoWorkers); + } + let mut que = payloads; + let mut addr_index = 0; + while que.len() > 0 { + let next_worker = addrs[addr_index]; + let mut next_payload = que.drain(0..1); + try!( + self.rpc_server.push_message( + next_worker, + next_payload.nth(0).expect("drained successfully of 0..1, so 0-th element should exist").as_bytes() + ) + ); + addr_index = addr_index + 1; + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::str::FromStr; + use std::net::SocketAddr; + use std::sync::{Arc, RwLock}; + use std::thread; + + pub struct VoidManager; + + impl JobDispatcher for VoidManager { } + + lazy_static! { + static ref LOG_DUMMY: bool = { + use log::LogLevelFilter; + use env_logger::LogBuilder; + use std::env; + + let mut builder = LogBuilder::new(); + builder.filter(None, LogLevelFilter::Info); + + if let Ok(log) = env::var("RUST_LOG") { + builder.parse(&log); + } + + if let Ok(_) = builder.init() { + println!("logger initialized"); + } + true + }; + } + + /// Intialize log with default settings + #[cfg(test)] + fn init_log() { + let _ = *LOG_DUMMY; + } + + pub fn dummy_request(addr: &SocketAddr, buf: &[u8]) -> Vec { + use std::io::{Read, Write}; + use mio::*; + use mio::tcp::*; + + let mut poll = Poll::new().unwrap(); + let mut sock = TcpStream::connect(addr).unwrap(); + poll.register(&sock, Token(0), EventSet::writable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); + poll.poll(Some(50)).unwrap(); + sock.write_all(buf).unwrap(); + poll.reregister(&sock, Token(0), EventSet::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); + poll.poll(Some(50)).unwrap(); + + let mut buf = Vec::new(); + sock.read_to_end(&mut buf).unwrap_or_else(|_| { 0 }); + buf + } + + pub fn dummy_async_waiter(addr: &SocketAddr, initial: Vec, result: Arc>>) -> ::devtools::StopGuard { + use std::io::{Read, Write}; + use mio::*; + use mio::tcp::*; + use std::sync::atomic::Ordering; + + let stop_guard = ::devtools::StopGuard::new(); + let collector = result.clone(); + let thread_stop = stop_guard.share(); + let socket_addr = addr.clone(); + thread::spawn(move || { + let mut poll = Poll::new().unwrap(); + let mut sock = TcpStream::connect(&socket_addr).unwrap(); + poll.register(&sock, Token(0), EventSet::writable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); + + for initial_req in initial { + poll.poll(Some(120)).unwrap(); + sock.write_all(initial_req.as_bytes()).unwrap(); + poll.reregister(&sock, Token(0), EventSet::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); + poll.poll(Some(120)).unwrap(); + + let mut buf = Vec::new(); + sock.read_to_end(&mut buf).unwrap_or_else(|_| { 0 }); + collector.write().unwrap().push(String::from_utf8(buf).unwrap()); + poll.reregister(&sock, Token(0), EventSet::writable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); + } + + while !thread_stop.load(Ordering::Relaxed) { + poll.reregister(&sock, Token(0), EventSet::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); + poll.poll(Some(120)).unwrap(); + + let mut buf = Vec::new(); + sock.read_to_end(&mut buf).unwrap_or_else(|_| { 0 }); + if buf.len() > 0 { + collector.write().unwrap().push(String::from_utf8(buf).unwrap()); + } + } + }); + + stop_guard + } + + #[test] + fn can_be_started() { + let stratum = Stratum::start(&SocketAddr::from_str("0.0.0.0:19980").unwrap(), Arc::new(VoidManager), None); + assert!(stratum.is_ok()); + } + + #[test] + fn records_subscriber() { + let addr = SocketAddr::from_str("0.0.0.0:19985").unwrap(); + let stratum = Stratum::start(&addr, Arc::new(VoidManager), None).unwrap(); + let request = r#"{"jsonrpc": "2.0", "method": "miner.subscribe", "params": [], "id": 1}"#; + dummy_request(&addr, request.as_bytes()); + assert_eq!(1, stratum.subscribers.read().len()); + } + + struct DummyManager { + initial_payload: String + } + + impl DummyManager { + fn new() -> Arc { + Arc::new(Self::build()) + } + + fn build() -> DummyManager { + DummyManager { initial_payload: r#"[ "dummy payload" ]"#.to_owned() } + } + + fn of_initial(mut self, new_initial: &str) -> DummyManager { + self.initial_payload = new_initial.to_owned(); + self + } + } + + impl JobDispatcher for DummyManager { + fn initial(&self) -> Option { + Some(self.initial_payload.clone()) + } + } + + #[test] + fn receives_initial_paylaod() { + let addr = SocketAddr::from_str("0.0.0.0:19975").unwrap(); + Stratum::start(&addr, DummyManager::new(), None).unwrap(); + let request = r#"{"jsonrpc": "2.0", "method": "miner.subscribe", "params": [], "id": 1}"#; + + let response = String::from_utf8(dummy_request(&addr, request.as_bytes())).unwrap(); + + assert_eq!(r#"{"jsonrpc":"2.0","result":["dummy payload"],"id":1}"#, response); + } + + #[test] + fn can_authorize() { + let addr = SocketAddr::from_str("0.0.0.0:19970").unwrap(); + let stratum = Stratum::start( + &addr, + Arc::new(DummyManager::build().of_initial(r#"["dummy autorize payload"]"#)), + None + ).unwrap(); + + let request = r#"{"jsonrpc": "2.0", "method": "miner.authorize", "params": ["miner1", ""], "id": 1}"#; + + let response = String::from_utf8(dummy_request(&addr, request.as_bytes())).unwrap(); + + assert_eq!(r#"{"jsonrpc":"2.0","result":true,"id":1}"#, response); + assert_eq!(1, stratum.workers.read().len()); + } + + #[test] + fn can_push_work() { + init_log(); + + let addr = SocketAddr::from_str("0.0.0.0:19965").unwrap(); + let stratum = Stratum::start( + &addr, + Arc::new(DummyManager::build().of_initial(r#"["dummy push request payload"]"#)), + None + ).unwrap(); + + let result = Arc::new(RwLock::new(Vec::::new())); + let _stop = dummy_async_waiter( + &addr, + vec![ + r#"{"jsonrpc": "2.0", "method": "miner.authorize", "params": ["miner1", ""], "id": 1}"#.to_owned(), + ], + result.clone(), + ); + ::std::thread::park_timeout(::std::time::Duration::from_millis(150)); + + stratum.push_work_all(r#"{ "00040008", "100500" }"#.to_owned()).unwrap(); + ::std::thread::park_timeout(::std::time::Duration::from_millis(150)); + + assert_eq!(2, result.read().unwrap().len()); + } +} diff --git a/stratum/src/traits.rs b/stratum/src/traits.rs new file mode 100644 index 000000000..8bd169ad6 --- /dev/null +++ b/stratum/src/traits.rs @@ -0,0 +1,52 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Stratum ipc interfaces specification + +use std; +use std::error::Error as StdError; + +#[derive(Debug, Clone)] +pub enum Error { + NoWork, + NoWorkers, + Io(String), +} + +impl From for Error { + fn from(err: std::io::Error) -> Self { + Error::Io(err.description().to_owned()) + } +} + +/// Interface that can provide pow/blockchain-specific responses for the clients +pub trait JobDispatcher: Send + Sync { + // json for initial client handshake + fn initial(&self) -> Option { None } + // json for difficulty dispatch + fn difficulty(&self) -> Option { None } + // json for job update given worker_id (payload manager should split job!) + fn job(&self, _worker_id: &str) -> Option { None } +} + +/// Interface that can handle requests to push job for workers +pub trait PushWorkHandler: Send + Sync { + /// push the same work package for all workers (`payload`: json of pow-specific set of work specification) + fn push_work_all(&self, payload: String) -> Result<(), Error>; + + /// push the work packages worker-wise (`payload`: json of pow-specific set of work specification) + fn push_work(&self, payloads: Vec) -> Result<(), Error>; +}