Stratum up (#4233)
* flush work * flush work * flush work * flush work * generalized notifiers * general setup with modules * general setup with modules * all binded * catch up with master * all dependencies injected * stratum another up * tcp update * submitwork routine * finalize & fix warnings * merge bugs, review fixes * merge bugs, review fixes * new cli mess cleanup * usage.txt swap * flush work * cli adopt * compilation with new cli sorted * subid space in json * serialization issues * grumbles addressed * more grumbles * remove last_work note for now * fix compilation * fix tests * merge bugs * no obliged ipc * moving notifiers * no optional feature now * refactored again * working on tests * refactor to new tcp/ip * stratum lib ok * ethcore crate ok * wip on tests * final test working * fix warnings, \n-terminated response * new compatibility * re-pushing work once anybody submitted * various review and general fixes * reviewe fixes * remove redundant notifier * one symbol -> huge bug * ensure write lock isn't held when calling handlers * extern declarations moved * options to stratum mod, SocketAddr strongly-typed instantiation * Minor style fix. * Whitespace and phrasing * Whitespace
This commit is contained in:
@@ -11,9 +11,9 @@ ethcore-ipc-codegen = { path = "../ipc/codegen" }
|
||||
|
||||
[dependencies]
|
||||
log = "0.3"
|
||||
jsonrpc-core = { git = "https://github.com/ethcore/jsonrpc.git", branch="mio" }
|
||||
jsonrpc-macros = { git = "https://github.com/ethcore/jsonrpc.git", branch="mio" }
|
||||
jsonrpc-tcp-server = { git = "https://github.com/ethcore/jsonrpc.git", branch="mio" }
|
||||
jsonrpc-core = { git = "https://github.com/ethcore/jsonrpc.git" }
|
||||
jsonrpc-macros = { git = "https://github.com/ethcore/jsonrpc.git" }
|
||||
jsonrpc-tcp-server = { git = "https://github.com/ethcore/jsonrpc.git" }
|
||||
mio = { git = "https://github.com/ethcore/mio", branch = "v0.5.x" }
|
||||
ethcore-util = { path = "../util" }
|
||||
ethcore-devtools = { path = "../devtools" }
|
||||
@@ -22,6 +22,8 @@ env_logger = "0.3"
|
||||
ethcore-ipc = { path = "../ipc/rpc" }
|
||||
semver = "0.5"
|
||||
ethcore-ipc-nano = { path = "../ipc/nano" }
|
||||
futures = "0.1"
|
||||
tokio-core = "0.1"
|
||||
|
||||
[profile.release]
|
||||
debug = true
|
||||
|
||||
@@ -23,16 +23,14 @@ extern crate jsonrpc_macros;
|
||||
extern crate ethcore_util as util;
|
||||
extern crate ethcore_ipc as ipc;
|
||||
extern crate semver;
|
||||
extern crate futures;
|
||||
|
||||
#[cfg(test)]
|
||||
extern crate mio;
|
||||
#[cfg(test)]
|
||||
#[cfg(test)] extern crate tokio_core;
|
||||
extern crate ethcore_devtools as devtools;
|
||||
#[cfg(test)]
|
||||
extern crate env_logger;
|
||||
#[cfg(test)]
|
||||
#[macro_use]
|
||||
extern crate lazy_static;
|
||||
#[cfg(test)] extern crate env_logger;
|
||||
#[cfg(test)] #[macro_use] extern crate lazy_static;
|
||||
|
||||
use futures::{future, BoxFuture, Future};
|
||||
|
||||
mod traits {
|
||||
//! Stratum ipc interfaces specification
|
||||
@@ -45,8 +43,11 @@ pub use traits::{
|
||||
RemoteWorkHandler, RemoteJobDispatcher,
|
||||
};
|
||||
|
||||
use jsonrpc_tcp_server::Server as JsonRpcServer;
|
||||
use jsonrpc_core::{IoHandler, Params, to_value};
|
||||
use jsonrpc_tcp_server::{
|
||||
Server as JsonRpcServer, RequestContext, MetaExtractor, Dispatcher,
|
||||
PushMessageError
|
||||
};
|
||||
use jsonrpc_core::{MetaIoHandler, Params, to_value, Value, Metadata, Compatibility};
|
||||
use jsonrpc_macros::IoDelegate;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -54,25 +55,64 @@ use std::net::SocketAddr;
|
||||
use std::collections::{HashSet, HashMap};
|
||||
use util::{H256, Hashable, RwLock, RwLockReadGuard};
|
||||
|
||||
type RpcResult = Result<jsonrpc_core::Value, jsonrpc_core::Error>;
|
||||
type RpcResult = BoxFuture<jsonrpc_core::Value, jsonrpc_core::Error>;
|
||||
|
||||
struct StratumRpc {
|
||||
stratum: RwLock<Option<Arc<Stratum>>>,
|
||||
}
|
||||
|
||||
impl StratumRpc {
|
||||
fn subscribe(&self, params: Params) -> RpcResult {
|
||||
fn subscribe(&self, params: Params, meta: SocketMetadata) -> RpcResult {
|
||||
self.stratum.read().as_ref().expect("RPC methods are called after stratum is set.")
|
||||
.subscribe(params)
|
||||
.subscribe(params, meta)
|
||||
}
|
||||
|
||||
fn authorize(&self, params: Params) -> RpcResult {
|
||||
fn authorize(&self, params: Params, meta: SocketMetadata) -> RpcResult {
|
||||
self.stratum.read().as_ref().expect("RPC methods are called after stratum is set.")
|
||||
.authorize(params)
|
||||
.authorize(params, meta)
|
||||
}
|
||||
|
||||
fn submit(&self, params: Params, meta: SocketMetadata) -> RpcResult {
|
||||
self.stratum.read().as_ref().expect("RPC methods are called after stratum is set.")
|
||||
.submit(params, meta)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SocketMetadata {
|
||||
addr: SocketAddr,
|
||||
}
|
||||
|
||||
impl Default for SocketMetadata {
|
||||
fn default() -> Self {
|
||||
SocketMetadata { addr: "0.0.0.0:0".parse().unwrap() }
|
||||
}
|
||||
}
|
||||
|
||||
impl SocketMetadata {
|
||||
pub fn addr(&self) -> &SocketAddr {
|
||||
&self.addr
|
||||
}
|
||||
}
|
||||
|
||||
impl Metadata for SocketMetadata { }
|
||||
|
||||
impl From<SocketAddr> for SocketMetadata {
|
||||
fn from(addr: SocketAddr) -> SocketMetadata {
|
||||
SocketMetadata { addr: addr }
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PeerMetaExtractor;
|
||||
|
||||
impl MetaExtractor<SocketMetadata> for PeerMetaExtractor {
|
||||
fn extract(&self, context: &RequestContext) -> SocketMetadata {
|
||||
context.peer_addr.into()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Stratum {
|
||||
rpc_server: JsonRpcServer<()>,
|
||||
rpc_server: JsonRpcServer<SocketMetadata>,
|
||||
/// Subscribed clients
|
||||
subscribers: RwLock<Vec<SocketAddr>>,
|
||||
/// List of workers supposed to receive job update
|
||||
@@ -83,48 +123,92 @@ pub struct Stratum {
|
||||
workers: Arc<RwLock<HashMap<SocketAddr, String>>>,
|
||||
/// Secret if any
|
||||
secret: Option<H256>,
|
||||
/// Dispatch notify couinter
|
||||
notify_counter: RwLock<u32>,
|
||||
/// Message dispatcher (tcp/ip service)
|
||||
tcp_dispatcher: Dispatcher,
|
||||
}
|
||||
|
||||
const NOTIFY_COUNTER_INITIAL: u32 = 16;
|
||||
|
||||
impl Stratum {
|
||||
pub fn start(
|
||||
addr: &SocketAddr,
|
||||
dispatcher: Arc<JobDispatcher>,
|
||||
secret: Option<H256>,
|
||||
) -> Result<Arc<Stratum>, jsonrpc_tcp_server::Error> {
|
||||
) -> Result<Arc<Stratum>, Error> {
|
||||
|
||||
let rpc = Arc::new(StratumRpc {
|
||||
stratum: RwLock::new(None),
|
||||
});
|
||||
let mut delegate = IoDelegate::<StratumRpc>::new(rpc.clone());
|
||||
delegate.add_method("miner.subscribe", StratumRpc::subscribe);
|
||||
delegate.add_method("miner.authorize", StratumRpc::authorize);
|
||||
|
||||
let mut handler = IoHandler::default();
|
||||
let mut delegate = IoDelegate::<StratumRpc, SocketMetadata>::new(rpc.clone());
|
||||
delegate.add_method_with_meta("mining.subscribe", StratumRpc::subscribe);
|
||||
delegate.add_method_with_meta("mining.authorize", StratumRpc::authorize);
|
||||
delegate.add_method_with_meta("mining.submit", StratumRpc::submit);
|
||||
let mut handler = MetaIoHandler::<SocketMetadata>::with_compatibility(Compatibility::Both);
|
||||
handler.extend_with(delegate);
|
||||
let server = JsonRpcServer::new(addr, handler)?;
|
||||
|
||||
let server = JsonRpcServer::new(addr.clone(), Arc::new(handler))
|
||||
.extractor(Arc::new(PeerMetaExtractor) as Arc<MetaExtractor<SocketMetadata>>);
|
||||
|
||||
let stratum = Arc::new(Stratum {
|
||||
tcp_dispatcher: server.dispatcher(),
|
||||
rpc_server: server,
|
||||
subscribers: RwLock::new(Vec::new()),
|
||||
job_que: RwLock::new(HashSet::new()),
|
||||
dispatcher: dispatcher,
|
||||
workers: Arc::new(RwLock::new(HashMap::new())),
|
||||
secret: secret,
|
||||
notify_counter: RwLock::new(NOTIFY_COUNTER_INITIAL),
|
||||
});
|
||||
*rpc.stratum.write() = Some(stratum.clone());
|
||||
|
||||
stratum.rpc_server.run_async()?;
|
||||
let running_stratum = stratum.clone();
|
||||
::std::thread::spawn(move || running_stratum.rpc_server.run());
|
||||
|
||||
Ok(stratum)
|
||||
}
|
||||
|
||||
fn subscribe(&self, _params: Params) -> RpcResult {
|
||||
fn update_peers(&self) {
|
||||
if let Some(job) = self.dispatcher.job() {
|
||||
if let Err(e) = self.push_work_all(job) {
|
||||
warn!("Failed to update some of the peers: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn submit(&self, params: Params, _meta: SocketMetadata) -> RpcResult {
|
||||
future::ok(match params {
|
||||
Params::Array(vals) => {
|
||||
// first two elements are service messages (worker_id & job_id)
|
||||
match self.dispatcher.submit(vals.iter().skip(2)
|
||||
.filter_map(|val| match val { &Value::String(ref str) => Some(str.to_owned()), _ => None })
|
||||
.collect::<Vec<String>>()) {
|
||||
Ok(()) => {
|
||||
self.update_peers();
|
||||
to_value(true)
|
||||
},
|
||||
Err(submit_err) => {
|
||||
warn!("Error while submitting share: {:?}", submit_err);
|
||||
to_value(false)
|
||||
}
|
||||
}
|
||||
},
|
||||
_ => {
|
||||
trace!(target: "stratum", "Invalid submit work format {:?}", params);
|
||||
to_value(false)
|
||||
}
|
||||
}).boxed()
|
||||
}
|
||||
|
||||
fn subscribe(&self, _params: Params, meta: SocketMetadata) -> RpcResult {
|
||||
use std::str::FromStr;
|
||||
|
||||
if let Some(context) = self.rpc_server.request_context() {
|
||||
self.subscribers.write().push(context.socket_addr);
|
||||
self.job_que.write().insert(context.socket_addr);
|
||||
trace!(target: "stratum", "Subscription request from {:?}", context.socket_addr);
|
||||
}
|
||||
Ok(match self.dispatcher.initial() {
|
||||
self.subscribers.write().push(meta.addr().clone());
|
||||
self.job_que.write().insert(meta.addr().clone());
|
||||
trace!(target: "stratum", "Subscription request from {:?}", meta.addr());
|
||||
|
||||
future::ok(match self.dispatcher.initial() {
|
||||
Some(initial) => match jsonrpc_core::Value::from_str(&initial) {
|
||||
Ok(val) => val,
|
||||
Err(e) => {
|
||||
@@ -133,26 +217,21 @@ impl Stratum {
|
||||
},
|
||||
},
|
||||
None => to_value(&[0u8; 0]),
|
||||
})
|
||||
}).boxed()
|
||||
}
|
||||
|
||||
fn authorize(&self, params: Params) -> RpcResult {
|
||||
params.parse::<(String, String)>().map(|(worker_id, secret)|{
|
||||
fn authorize(&self, params: Params, meta: SocketMetadata) -> RpcResult {
|
||||
future::result(params.parse::<(String, String)>().map(|(worker_id, secret)|{
|
||||
if let Some(valid_secret) = self.secret {
|
||||
let hash = secret.sha3();
|
||||
if hash != valid_secret {
|
||||
return to_value(&false);
|
||||
}
|
||||
}
|
||||
if let Some(context) = self.rpc_server.request_context() {
|
||||
self.workers.write().insert(context.socket_addr, worker_id);
|
||||
to_value(&true)
|
||||
}
|
||||
else {
|
||||
warn!(target: "stratum", "Authorize without valid context received!");
|
||||
to_value(&false)
|
||||
}
|
||||
})
|
||||
trace!(target: "stratum", "New worker #{} registered", worker_id);
|
||||
self.workers.write().insert(meta.addr().clone(), worker_id);
|
||||
to_value(true)
|
||||
})).boxed()
|
||||
}
|
||||
|
||||
pub fn subscribers(&self) -> RwLockReadGuard<Vec<SocketAddr>> {
|
||||
@@ -161,31 +240,50 @@ impl Stratum {
|
||||
|
||||
pub fn maintain(&self) {
|
||||
let mut job_que = self.job_que.write();
|
||||
let workers = self.workers.read();
|
||||
let job_payload = self.dispatcher.job();
|
||||
for socket_addr in job_que.drain() {
|
||||
if let Some(worker_id) = workers.get(&socket_addr) {
|
||||
let job_payload = self.dispatcher.job(worker_id.to_owned());
|
||||
job_payload.map(
|
||||
|json| self.rpc_server.push_message(&socket_addr, json.as_bytes())
|
||||
);
|
||||
}
|
||||
else {
|
||||
trace!(
|
||||
target: "stratum",
|
||||
"Job queued for worker that is still not authorized, skipping ('{:?}')", socket_addr
|
||||
);
|
||||
}
|
||||
job_payload.as_ref().map(
|
||||
|json| self.tcp_dispatcher.push_message(&socket_addr, json.to_owned())
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PushWorkHandler for Stratum {
|
||||
fn push_work_all(&self, payload: String) -> Result<(), Error> {
|
||||
let workers = self.workers.read();
|
||||
println!("pushing work for {} workers", workers.len());
|
||||
for (ref addr, _) in workers.iter() {
|
||||
self.rpc_server.push_message(addr, payload.as_bytes())?;
|
||||
let hup_peers = {
|
||||
let workers = self.workers.read();
|
||||
let next_request_id = {
|
||||
let mut counter = self.notify_counter.write();
|
||||
if *counter == ::std::u32::MAX { *counter = NOTIFY_COUNTER_INITIAL; }
|
||||
else { *counter = *counter + 1 }
|
||||
*counter
|
||||
};
|
||||
|
||||
let mut hup_peers = HashSet::with_capacity(0); // most of the cases won't be needed, hence avoid allocation
|
||||
let workers_msg = format!("{{ \"id\": {}, \"method\": \"mining.notify\", \"params\": {} }}", next_request_id, payload);
|
||||
trace!(target: "stratum", "pushing work for {} workers (payload: '{}')", workers.len(), &workers_msg);
|
||||
for (ref addr, _) in workers.iter() {
|
||||
trace!(target: "stratum", "pusing work to {}", addr);
|
||||
match self.tcp_dispatcher.push_message(addr, workers_msg.clone()) {
|
||||
Err(PushMessageError::NoSuchPeer) => {
|
||||
trace!(target: "stratum", "Worker no longer connected: {}", &addr);
|
||||
hup_peers.insert(*addr.clone());
|
||||
},
|
||||
Err(e) => {
|
||||
warn!(target: "stratum", "Unexpected transport error: {:?}", e);
|
||||
},
|
||||
Ok(_) => { },
|
||||
}
|
||||
}
|
||||
hup_peers
|
||||
};
|
||||
|
||||
if !hup_peers.is_empty() {
|
||||
let mut workers = self.workers.write();
|
||||
for hup_peer in hup_peers { workers.remove(&hup_peer); }
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -203,9 +301,9 @@ impl PushWorkHandler for Stratum {
|
||||
while que.len() > 0 {
|
||||
let next_worker = addrs[addr_index];
|
||||
let mut next_payload = que.drain(0..1);
|
||||
self.rpc_server.push_message(
|
||||
self.tcp_dispatcher.push_message(
|
||||
next_worker,
|
||||
next_payload.nth(0).expect("drained successfully of 0..1, so 0-th element should exist").as_bytes()
|
||||
next_payload.nth(0).expect("drained successfully of 0..1, so 0-th element should exist")
|
||||
)?;
|
||||
addr_index = addr_index + 1;
|
||||
}
|
||||
@@ -218,12 +316,20 @@ mod tests {
|
||||
use super::*;
|
||||
use std::str::FromStr;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::thread;
|
||||
use std::sync::Arc;
|
||||
|
||||
use tokio_core::reactor::{Core, Timeout};
|
||||
use tokio_core::net::TcpStream;
|
||||
use tokio_core::io;
|
||||
use futures::{Future, future};
|
||||
|
||||
pub struct VoidManager;
|
||||
|
||||
impl JobDispatcher for VoidManager { }
|
||||
impl JobDispatcher for VoidManager {
|
||||
fn submit(&self, _payload: Vec<String>) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
static ref LOG_DUMMY: bool = {
|
||||
@@ -251,78 +357,42 @@ mod tests {
|
||||
let _ = *LOG_DUMMY;
|
||||
}
|
||||
|
||||
pub fn dummy_request(addr: &SocketAddr, buf: &[u8]) -> Vec<u8> {
|
||||
use std::io::{Read, Write};
|
||||
use mio::*;
|
||||
use mio::tcp::*;
|
||||
fn dummy_request(addr: &SocketAddr, data: &str) -> Vec<u8> {
|
||||
let mut core = Core::new().expect("Tokio Core should be created with no errors");
|
||||
let mut buffer = vec![0u8; 2048];
|
||||
|
||||
let mut poll = Poll::new().unwrap();
|
||||
let mut sock = TcpStream::connect(addr).unwrap();
|
||||
poll.register(&sock, Token(0), EventSet::writable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
|
||||
poll.poll(Some(50)).unwrap();
|
||||
sock.write_all(buf).unwrap();
|
||||
poll.reregister(&sock, Token(0), EventSet::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
|
||||
poll.poll(Some(50)).unwrap();
|
||||
let mut data_vec = data.as_bytes().to_vec();
|
||||
data_vec.extend(b"\n");
|
||||
|
||||
let mut buf = Vec::new();
|
||||
sock.read_to_end(&mut buf).unwrap_or_else(|_| { 0 });
|
||||
buf
|
||||
}
|
||||
let stream = TcpStream::connect(addr, &core.handle())
|
||||
.and_then(|stream| {
|
||||
io::write_all(stream, &data_vec)
|
||||
})
|
||||
.and_then(|(stream, _)| {
|
||||
io::read(stream, &mut buffer)
|
||||
})
|
||||
.and_then(|(_, read_buf, len)| {
|
||||
future::ok(read_buf[0..len].to_vec())
|
||||
});
|
||||
let result = core.run(stream).expect("Core should run with no errors");
|
||||
|
||||
pub fn dummy_async_waiter(addr: &SocketAddr, initial: Vec<String>, result: Arc<RwLock<Vec<String>>>) -> ::devtools::StopGuard {
|
||||
use std::io::{Read, Write};
|
||||
use mio::*;
|
||||
use mio::tcp::*;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
let stop_guard = ::devtools::StopGuard::new();
|
||||
let collector = result.clone();
|
||||
let thread_stop = stop_guard.share();
|
||||
let socket_addr = addr.clone();
|
||||
thread::spawn(move || {
|
||||
let mut poll = Poll::new().unwrap();
|
||||
let mut sock = TcpStream::connect(&socket_addr).unwrap();
|
||||
poll.register(&sock, Token(0), EventSet::writable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
|
||||
|
||||
for initial_req in initial {
|
||||
poll.poll(Some(120)).unwrap();
|
||||
sock.write_all(initial_req.as_bytes()).unwrap();
|
||||
poll.reregister(&sock, Token(0), EventSet::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
|
||||
poll.poll(Some(120)).unwrap();
|
||||
|
||||
let mut buf = Vec::new();
|
||||
sock.read_to_end(&mut buf).unwrap_or_else(|_| { 0 });
|
||||
collector.write().unwrap().push(String::from_utf8(buf).unwrap());
|
||||
poll.reregister(&sock, Token(0), EventSet::writable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
|
||||
}
|
||||
|
||||
while !thread_stop.load(Ordering::Relaxed) {
|
||||
poll.reregister(&sock, Token(0), EventSet::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
|
||||
poll.poll(Some(120)).unwrap();
|
||||
|
||||
let mut buf = Vec::new();
|
||||
sock.read_to_end(&mut buf).unwrap_or_else(|_| { 0 });
|
||||
if buf.len() > 0 {
|
||||
collector.write().unwrap().push(String::from_utf8(buf).unwrap());
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
stop_guard
|
||||
result
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn can_be_started() {
|
||||
let stratum = Stratum::start(&SocketAddr::from_str("0.0.0.0:19980").unwrap(), Arc::new(VoidManager), None);
|
||||
let stratum = Stratum::start(&SocketAddr::from_str("127.0.0.1:19980").unwrap(), Arc::new(VoidManager), None);
|
||||
assert!(stratum.is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn records_subscriber() {
|
||||
let addr = SocketAddr::from_str("0.0.0.0:19985").unwrap();
|
||||
init_log();
|
||||
|
||||
let addr = SocketAddr::from_str("127.0.0.1:19985").unwrap();
|
||||
let stratum = Stratum::start(&addr, Arc::new(VoidManager), None).unwrap();
|
||||
let request = r#"{"jsonrpc": "2.0", "method": "miner.subscribe", "params": [], "id": 1}"#;
|
||||
dummy_request(&addr, request.as_bytes());
|
||||
let request = r#"{"jsonrpc": "2.0", "method": "mining.subscribe", "params": [], "id": 1}"#;
|
||||
dummy_request(&addr, request);
|
||||
assert_eq!(1, stratum.subscribers.read().len());
|
||||
}
|
||||
|
||||
@@ -349,33 +419,43 @@ mod tests {
|
||||
fn initial(&self) -> Option<String> {
|
||||
Some(self.initial_payload.clone())
|
||||
}
|
||||
|
||||
fn submit(&self, _payload: Vec<String>) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn terminated_str(origin: &'static str) -> String {
|
||||
let mut s = String::new();
|
||||
s.push_str(origin);
|
||||
s.push_str("\n");
|
||||
s
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn receives_initial_paylaod() {
|
||||
let addr = SocketAddr::from_str("0.0.0.0:19975").unwrap();
|
||||
Stratum::start(&addr, DummyManager::new(), None).unwrap();
|
||||
let request = r#"{"jsonrpc": "2.0", "method": "miner.subscribe", "params": [], "id": 1}"#;
|
||||
let addr = SocketAddr::from_str("127.0.0.1:19975").unwrap();
|
||||
Stratum::start(&addr, DummyManager::new(), None).expect("There should be no error starting stratum");
|
||||
let request = r#"{"jsonrpc": "2.0", "method": "mining.subscribe", "params": [], "id": 2}"#;
|
||||
|
||||
let response = String::from_utf8(dummy_request(&addr, request.as_bytes())).unwrap();
|
||||
let response = String::from_utf8(dummy_request(&addr, request)).unwrap();
|
||||
|
||||
assert_eq!(r#"{"jsonrpc":"2.0","result":["dummy payload"],"id":1}"#, response);
|
||||
assert_eq!(terminated_str(r#"{"jsonrpc":"2.0","result":["dummy payload"],"id":2}"#), response);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn can_authorize() {
|
||||
let addr = SocketAddr::from_str("0.0.0.0:19970").unwrap();
|
||||
let addr = SocketAddr::from_str("127.0.0.1:19970").unwrap();
|
||||
let stratum = Stratum::start(
|
||||
&addr,
|
||||
Arc::new(DummyManager::build().of_initial(r#"["dummy autorize payload"]"#)),
|
||||
None
|
||||
).unwrap();
|
||||
).expect("There should be no error starting stratum");
|
||||
|
||||
let request = r#"{"jsonrpc": "2.0", "method": "miner.authorize", "params": ["miner1", ""], "id": 1}"#;
|
||||
let request = r#"{"jsonrpc": "2.0", "method": "mining.authorize", "params": ["miner1", ""], "id": 1}"#;
|
||||
let response = String::from_utf8(dummy_request(&addr, request)).unwrap();
|
||||
|
||||
let response = String::from_utf8(dummy_request(&addr, request.as_bytes())).unwrap();
|
||||
|
||||
assert_eq!(r#"{"jsonrpc":"2.0","result":true,"id":1}"#, response);
|
||||
assert_eq!(terminated_str(r#"{"jsonrpc":"2.0","result":true,"id":1}"#), response);
|
||||
assert_eq!(1, stratum.workers.read().len());
|
||||
}
|
||||
|
||||
@@ -383,26 +463,57 @@ mod tests {
|
||||
fn can_push_work() {
|
||||
init_log();
|
||||
|
||||
let addr = SocketAddr::from_str("0.0.0.0:19965").unwrap();
|
||||
let addr = SocketAddr::from_str("127.0.0.1:19995").unwrap();
|
||||
let stratum = Stratum::start(
|
||||
&addr,
|
||||
Arc::new(DummyManager::build().of_initial(r#"["dummy push request payload"]"#)),
|
||||
Arc::new(DummyManager::build().of_initial(r#"["dummy autorize payload"]"#)),
|
||||
None
|
||||
).unwrap();
|
||||
).expect("There should be no error starting stratum");
|
||||
|
||||
let result = Arc::new(RwLock::new(Vec::<String>::new()));
|
||||
let _stop = dummy_async_waiter(
|
||||
&addr,
|
||||
vec![
|
||||
r#"{"jsonrpc": "2.0", "method": "miner.authorize", "params": ["miner1", ""], "id": 1}"#.to_owned(),
|
||||
],
|
||||
result.clone(),
|
||||
);
|
||||
::std::thread::park_timeout(::std::time::Duration::from_millis(150));
|
||||
let mut auth_request =
|
||||
r#"{"jsonrpc": "2.0", "method": "mining.authorize", "params": ["miner1", ""], "id": 1}"#
|
||||
.as_bytes()
|
||||
.to_vec();
|
||||
auth_request.extend(b"\n");
|
||||
|
||||
stratum.push_work_all(r#"{ "00040008", "100500" }"#.to_owned()).unwrap();
|
||||
::std::thread::park_timeout(::std::time::Duration::from_millis(150));
|
||||
let mut core = Core::new().expect("Tokio Core should be created with no errors");
|
||||
let timeout1 = Timeout::new(::std::time::Duration::from_millis(100), &core.handle())
|
||||
.expect("There should be a timeout produced in message test");
|
||||
let timeout2 = Timeout::new(::std::time::Duration::from_millis(100), &core.handle())
|
||||
.expect("There should be a timeout produced in message test");
|
||||
let mut buffer = vec![0u8; 2048];
|
||||
let mut buffer2 = vec![0u8; 2048];
|
||||
let stream = TcpStream::connect(&addr, &core.handle())
|
||||
.and_then(|stream| {
|
||||
io::write_all(stream, &auth_request)
|
||||
})
|
||||
.and_then(|(stream, _)| {
|
||||
io::read(stream, &mut buffer)
|
||||
})
|
||||
.and_then(|(stream, _, _)| {
|
||||
trace!(target: "stratum", "Received authorization confirmation");
|
||||
timeout1.join(future::ok(stream))
|
||||
})
|
||||
.and_then(|(_, stream)| {
|
||||
trace!(target: "stratum", "Pusing work to peers");
|
||||
stratum.push_work_all(r#"{ "00040008", "100500" }"#.to_owned())
|
||||
.expect("Pushing work should produce no errors");
|
||||
timeout2.join(future::ok(stream))
|
||||
})
|
||||
.and_then(|(_, stream)| {
|
||||
trace!(target: "stratum", "Ready to read work from server");
|
||||
io::read(stream, &mut buffer2)
|
||||
})
|
||||
.and_then(|(_, read_buf, len)| {
|
||||
trace!(target: "stratum", "Received work from server");
|
||||
future::ok(read_buf[0..len].to_vec())
|
||||
});
|
||||
let response = String::from_utf8(
|
||||
core.run(stream).expect("Core should run with no errors")
|
||||
).expect("Response should be utf-8");
|
||||
|
||||
assert_eq!(2, result.read().unwrap().len());
|
||||
assert_eq!(
|
||||
"{ \"id\": 17, \"method\": \"mining.notify\", \"params\": { \"00040008\", \"100500\" } }\n",
|
||||
response);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ use std;
|
||||
use std::error::Error as StdError;
|
||||
use util::H256;
|
||||
use ipc::IpcConfig;
|
||||
use jsonrpc_tcp_server::PushMessageError;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[binary]
|
||||
@@ -25,6 +26,8 @@ pub enum Error {
|
||||
NoWork,
|
||||
NoWorkers,
|
||||
Io(String),
|
||||
Tcp(String),
|
||||
Dispatch(String),
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for Error {
|
||||
@@ -33,6 +36,12 @@ impl From<std::io::Error> for Error {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PushMessageError> for Error {
|
||||
fn from(err: PushMessageError) -> Self {
|
||||
Error::Tcp(format!("Push message error: {:?}", err))
|
||||
}
|
||||
}
|
||||
|
||||
#[ipc(client_ident="RemoteJobDispatcher")]
|
||||
/// Interface that can provide pow/blockchain-specific responses for the clients
|
||||
pub trait JobDispatcher: Send + Sync {
|
||||
@@ -41,7 +50,9 @@ pub trait JobDispatcher: Send + Sync {
|
||||
// json for difficulty dispatch
|
||||
fn difficulty(&self) -> Option<String> { None }
|
||||
// json for job update given worker_id (payload manager should split job!)
|
||||
fn job(&self, _worker_id: String) -> Option<String> { None }
|
||||
fn job(&self) -> Option<String> { None }
|
||||
// miner job result
|
||||
fn submit(&self, payload: Vec<String>) -> Result<(), Error>;
|
||||
}
|
||||
|
||||
#[ipc(client_ident="RemoteWorkHandler")]
|
||||
@@ -56,7 +67,9 @@ pub trait PushWorkHandler: Send + Sync {
|
||||
|
||||
#[binary]
|
||||
pub struct ServiceConfiguration {
|
||||
pub io_path: String,
|
||||
pub listen_addr: String,
|
||||
pub port: u16,
|
||||
pub secret: Option<H256>,
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user