// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see .
//! Stratum protocol implementation for parity ethereum/bitcoin clients
extern crate jsonrpc_tcp_server;
extern crate jsonrpc_core;
extern crate jsonrpc_macros;
#[macro_use] extern crate log;
extern crate ethcore_util as util;
extern crate ethcore_ipc as ipc;
extern crate semver;
extern crate futures;
#[cfg(test)] extern crate tokio_core;
extern crate ethcore_devtools as devtools;
#[cfg(test)] extern crate env_logger;
#[cfg(test)] #[macro_use] extern crate lazy_static;
use futures::{future, BoxFuture, Future};
mod traits {
//! Stratum ipc interfaces specification
#![allow(dead_code, unused_assignments, unused_variables, missing_docs)] // codegen issues
include!(concat!(env!("OUT_DIR"), "/traits.rs"));
}
pub use traits::{
JobDispatcher, PushWorkHandler, Error, ServiceConfiguration,
RemoteWorkHandler, RemoteJobDispatcher,
};
use jsonrpc_tcp_server::{
Server as JsonRpcServer, RequestContext, MetaExtractor, Dispatcher,
PushMessageError
};
use jsonrpc_core::{MetaIoHandler, Params, to_value, Value, Metadata, Compatibility};
use jsonrpc_macros::IoDelegate;
use std::sync::Arc;
use std::net::SocketAddr;
use std::collections::{HashSet, HashMap};
use util::{H256, Hashable, RwLock, RwLockReadGuard};
type RpcResult = BoxFuture;
struct StratumRpc {
stratum: RwLock>>,
}
impl StratumRpc {
fn subscribe(&self, params: Params, meta: SocketMetadata) -> RpcResult {
self.stratum.read().as_ref().expect("RPC methods are called after stratum is set.")
.subscribe(params, meta)
}
fn authorize(&self, params: Params, meta: SocketMetadata) -> RpcResult {
self.stratum.read().as_ref().expect("RPC methods are called after stratum is set.")
.authorize(params, meta)
}
fn submit(&self, params: Params, meta: SocketMetadata) -> RpcResult {
self.stratum.read().as_ref().expect("RPC methods are called after stratum is set.")
.submit(params, meta)
}
}
#[derive(Clone)]
pub struct SocketMetadata {
addr: SocketAddr,
}
impl Default for SocketMetadata {
fn default() -> Self {
SocketMetadata { addr: "0.0.0.0:0".parse().unwrap() }
}
}
impl SocketMetadata {
pub fn addr(&self) -> &SocketAddr {
&self.addr
}
}
impl Metadata for SocketMetadata { }
impl From for SocketMetadata {
fn from(addr: SocketAddr) -> SocketMetadata {
SocketMetadata { addr: addr }
}
}
pub struct PeerMetaExtractor;
impl MetaExtractor for PeerMetaExtractor {
fn extract(&self, context: &RequestContext) -> SocketMetadata {
context.peer_addr.into()
}
}
pub struct Stratum {
rpc_server: JsonRpcServer,
/// Subscribed clients
subscribers: RwLock>,
/// List of workers supposed to receive job update
job_que: RwLock>,
/// Payload manager
dispatcher: Arc,
/// Authorized workers (socket - worker_id)
workers: Arc>>,
/// Secret if any
secret: Option,
/// Dispatch notify couinter
notify_counter: RwLock,
/// Message dispatcher (tcp/ip service)
tcp_dispatcher: Dispatcher,
}
const NOTIFY_COUNTER_INITIAL: u32 = 16;
impl Stratum {
pub fn start(
addr: &SocketAddr,
dispatcher: Arc,
secret: Option,
) -> Result, Error> {
let rpc = Arc::new(StratumRpc {
stratum: RwLock::new(None),
});
let mut delegate = IoDelegate::::new(rpc.clone());
delegate.add_method_with_meta("mining.subscribe", StratumRpc::subscribe);
delegate.add_method_with_meta("mining.authorize", StratumRpc::authorize);
delegate.add_method_with_meta("mining.submit", StratumRpc::submit);
let mut handler = MetaIoHandler::::with_compatibility(Compatibility::Both);
handler.extend_with(delegate);
let server = JsonRpcServer::new(addr.clone(), Arc::new(handler))
.extractor(Arc::new(PeerMetaExtractor) as Arc>);
let stratum = Arc::new(Stratum {
tcp_dispatcher: server.dispatcher(),
rpc_server: server,
subscribers: RwLock::new(Vec::new()),
job_que: RwLock::new(HashSet::new()),
dispatcher: dispatcher,
workers: Arc::new(RwLock::new(HashMap::new())),
secret: secret,
notify_counter: RwLock::new(NOTIFY_COUNTER_INITIAL),
});
*rpc.stratum.write() = Some(stratum.clone());
let running_stratum = stratum.clone();
::std::thread::spawn(move || running_stratum.rpc_server.run());
Ok(stratum)
}
fn update_peers(&self) {
if let Some(job) = self.dispatcher.job() {
if let Err(e) = self.push_work_all(job) {
warn!("Failed to update some of the peers: {:?}", e);
}
}
}
fn submit(&self, params: Params, _meta: SocketMetadata) -> RpcResult {
future::ok(match params {
Params::Array(vals) => {
// first two elements are service messages (worker_id & job_id)
match self.dispatcher.submit(vals.iter().skip(2)
.filter_map(|val| match val { &Value::String(ref str) => Some(str.to_owned()), _ => None })
.collect::>()) {
Ok(()) => {
self.update_peers();
to_value(true)
},
Err(submit_err) => {
warn!("Error while submitting share: {:?}", submit_err);
to_value(false)
}
}
},
_ => {
trace!(target: "stratum", "Invalid submit work format {:?}", params);
to_value(false)
}
}.expect("Only true/false is returned and it's always serializable; qed")).boxed()
}
fn subscribe(&self, _params: Params, meta: SocketMetadata) -> RpcResult {
use std::str::FromStr;
self.subscribers.write().push(meta.addr().clone());
self.job_que.write().insert(meta.addr().clone());
trace!(target: "stratum", "Subscription request from {:?}", meta.addr());
future::ok(match self.dispatcher.initial() {
Some(initial) => match jsonrpc_core::Value::from_str(&initial) {
Ok(val) => Ok(val),
Err(e) => {
warn!(target: "stratum", "Invalid payload: '{}' ({:?})", &initial, e);
to_value(&[0u8; 0])
},
},
None => to_value(&[0u8; 0]),
}.expect("Empty slices are serializable; qed")).boxed()
}
fn authorize(&self, params: Params, meta: SocketMetadata) -> RpcResult {
future::result(params.parse::<(String, String)>().map(|(worker_id, secret)|{
if let Some(valid_secret) = self.secret {
let hash = secret.sha3();
if hash != valid_secret {
return to_value(&false);
}
}
trace!(target: "stratum", "New worker #{} registered", worker_id);
self.workers.write().insert(meta.addr().clone(), worker_id);
to_value(true)
}).map(|v| v.expect("Only true/false is returned and it's always serializable; qed"))).boxed()
}
pub fn subscribers(&self) -> RwLockReadGuard> {
self.subscribers.read()
}
pub fn maintain(&self) {
let mut job_que = self.job_que.write();
let job_payload = self.dispatcher.job();
for socket_addr in job_que.drain() {
job_payload.as_ref().map(
|json| self.tcp_dispatcher.push_message(&socket_addr, json.to_owned())
);
}
}
}
impl PushWorkHandler for Stratum {
fn push_work_all(&self, payload: String) -> Result<(), Error> {
let hup_peers = {
let workers = self.workers.read();
let next_request_id = {
let mut counter = self.notify_counter.write();
if *counter == ::std::u32::MAX { *counter = NOTIFY_COUNTER_INITIAL; }
else { *counter = *counter + 1 }
*counter
};
let mut hup_peers = HashSet::with_capacity(0); // most of the cases won't be needed, hence avoid allocation
let workers_msg = format!("{{ \"id\": {}, \"method\": \"mining.notify\", \"params\": {} }}", next_request_id, payload);
trace!(target: "stratum", "pushing work for {} workers (payload: '{}')", workers.len(), &workers_msg);
for (ref addr, _) in workers.iter() {
trace!(target: "stratum", "pusing work to {}", addr);
match self.tcp_dispatcher.push_message(addr, workers_msg.clone()) {
Err(PushMessageError::NoSuchPeer) => {
trace!(target: "stratum", "Worker no longer connected: {}", &addr);
hup_peers.insert(*addr.clone());
},
Err(e) => {
warn!(target: "stratum", "Unexpected transport error: {:?}", e);
},
Ok(_) => { },
}
}
hup_peers
};
if !hup_peers.is_empty() {
let mut workers = self.workers.write();
for hup_peer in hup_peers { workers.remove(&hup_peer); }
}
Ok(())
}
fn push_work(&self, payloads: Vec) -> Result<(), Error> {
if !payloads.len() > 0 {
return Err(Error::NoWork);
}
let workers = self.workers.read();
let addrs = workers.keys().collect::>();
if !workers.len() > 0 {
return Err(Error::NoWorkers);
}
let mut que = payloads;
let mut addr_index = 0;
while que.len() > 0 {
let next_worker = addrs[addr_index];
let mut next_payload = que.drain(0..1);
self.tcp_dispatcher.push_message(
next_worker,
next_payload.nth(0).expect("drained successfully of 0..1, so 0-th element should exist")
)?;
addr_index = addr_index + 1;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::str::FromStr;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio_core::reactor::{Core, Timeout};
use tokio_core::net::TcpStream;
use tokio_core::io;
use futures::{Future, future};
pub struct VoidManager;
impl JobDispatcher for VoidManager {
fn submit(&self, _payload: Vec) -> Result<(), Error> {
Ok(())
}
}
lazy_static! {
static ref LOG_DUMMY: bool = {
use log::LogLevelFilter;
use env_logger::LogBuilder;
use std::env;
let mut builder = LogBuilder::new();
builder.filter(None, LogLevelFilter::Info);
if let Ok(log) = env::var("RUST_LOG") {
builder.parse(&log);
}
if let Ok(_) = builder.init() {
println!("logger initialized");
}
true
};
}
/// Intialize log with default settings
#[cfg(test)]
fn init_log() {
let _ = *LOG_DUMMY;
}
fn dummy_request(addr: &SocketAddr, data: &str) -> Vec {
let mut core = Core::new().expect("Tokio Core should be created with no errors");
let mut buffer = vec![0u8; 2048];
let mut data_vec = data.as_bytes().to_vec();
data_vec.extend(b"\n");
let stream = TcpStream::connect(addr, &core.handle())
.and_then(|stream| {
io::write_all(stream, &data_vec)
})
.and_then(|(stream, _)| {
io::read(stream, &mut buffer)
})
.and_then(|(_, read_buf, len)| {
future::ok(read_buf[0..len].to_vec())
});
let result = core.run(stream).expect("Core should run with no errors");
result
}
#[test]
fn can_be_started() {
let stratum = Stratum::start(&SocketAddr::from_str("127.0.0.1:19980").unwrap(), Arc::new(VoidManager), None);
assert!(stratum.is_ok());
}
#[test]
fn records_subscriber() {
init_log();
let addr = SocketAddr::from_str("127.0.0.1:19985").unwrap();
let stratum = Stratum::start(&addr, Arc::new(VoidManager), None).unwrap();
let request = r#"{"jsonrpc": "2.0", "method": "mining.subscribe", "params": [], "id": 1}"#;
dummy_request(&addr, request);
assert_eq!(1, stratum.subscribers.read().len());
}
struct DummyManager {
initial_payload: String
}
impl DummyManager {
fn new() -> Arc {
Arc::new(Self::build())
}
fn build() -> DummyManager {
DummyManager { initial_payload: r#"[ "dummy payload" ]"#.to_owned() }
}
fn of_initial(mut self, new_initial: &str) -> DummyManager {
self.initial_payload = new_initial.to_owned();
self
}
}
impl JobDispatcher for DummyManager {
fn initial(&self) -> Option {
Some(self.initial_payload.clone())
}
fn submit(&self, _payload: Vec) -> Result<(), Error> {
Ok(())
}
}
fn terminated_str(origin: &'static str) -> String {
let mut s = String::new();
s.push_str(origin);
s.push_str("\n");
s
}
#[test]
fn receives_initial_paylaod() {
let addr = SocketAddr::from_str("127.0.0.1:19975").unwrap();
Stratum::start(&addr, DummyManager::new(), None).expect("There should be no error starting stratum");
let request = r#"{"jsonrpc": "2.0", "method": "mining.subscribe", "params": [], "id": 2}"#;
let response = String::from_utf8(dummy_request(&addr, request)).unwrap();
assert_eq!(terminated_str(r#"{"jsonrpc":"2.0","result":["dummy payload"],"id":2}"#), response);
}
#[test]
fn can_authorize() {
let addr = SocketAddr::from_str("127.0.0.1:19970").unwrap();
let stratum = Stratum::start(
&addr,
Arc::new(DummyManager::build().of_initial(r#"["dummy autorize payload"]"#)),
None
).expect("There should be no error starting stratum");
let request = r#"{"jsonrpc": "2.0", "method": "mining.authorize", "params": ["miner1", ""], "id": 1}"#;
let response = String::from_utf8(dummy_request(&addr, request)).unwrap();
assert_eq!(terminated_str(r#"{"jsonrpc":"2.0","result":true,"id":1}"#), response);
assert_eq!(1, stratum.workers.read().len());
}
#[test]
fn can_push_work() {
init_log();
let addr = SocketAddr::from_str("127.0.0.1:19995").unwrap();
let stratum = Stratum::start(
&addr,
Arc::new(DummyManager::build().of_initial(r#"["dummy autorize payload"]"#)),
None
).expect("There should be no error starting stratum");
let mut auth_request =
r#"{"jsonrpc": "2.0", "method": "mining.authorize", "params": ["miner1", ""], "id": 1}"#
.as_bytes()
.to_vec();
auth_request.extend(b"\n");
let mut core = Core::new().expect("Tokio Core should be created with no errors");
let timeout1 = Timeout::new(::std::time::Duration::from_millis(100), &core.handle())
.expect("There should be a timeout produced in message test");
let timeout2 = Timeout::new(::std::time::Duration::from_millis(100), &core.handle())
.expect("There should be a timeout produced in message test");
let mut buffer = vec![0u8; 2048];
let mut buffer2 = vec![0u8; 2048];
let stream = TcpStream::connect(&addr, &core.handle())
.and_then(|stream| {
io::write_all(stream, &auth_request)
})
.and_then(|(stream, _)| {
io::read(stream, &mut buffer)
})
.and_then(|(stream, _, _)| {
trace!(target: "stratum", "Received authorization confirmation");
timeout1.join(future::ok(stream))
})
.and_then(|(_, stream)| {
trace!(target: "stratum", "Pusing work to peers");
stratum.push_work_all(r#"{ "00040008", "100500" }"#.to_owned())
.expect("Pushing work should produce no errors");
timeout2.join(future::ok(stream))
})
.and_then(|(_, stream)| {
trace!(target: "stratum", "Ready to read work from server");
io::read(stream, &mut buffer2)
})
.and_then(|(_, read_buf, len)| {
trace!(target: "stratum", "Received work from server");
future::ok(read_buf[0..len].to_vec())
});
let response = String::from_utf8(
core.run(stream).expect("Core should run with no errors")
).expect("Response should be utf-8");
assert_eq!(
"{ \"id\": 17, \"method\": \"mining.notify\", \"params\": { \"00040008\", \"100500\" } }\n",
response);
}
}