refactor stratum to remove retain cycle (#7827)

* refactor stratum to remove retain cycle, fixed #7823

* fix tests
This commit is contained in:
Marek Kotewicz 2018-02-07 17:15:40 +01:00 committed by GitHub
parent b4ed51c5f1
commit f244ebeb4a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -47,68 +47,84 @@ use std::net::SocketAddr;
use std::collections::{HashSet, HashMap}; use std::collections::{HashSet, HashMap};
use hash::keccak; use hash::keccak;
use ethereum_types::H256; use ethereum_types::H256;
use parking_lot::{RwLock, RwLockReadGuard}; use parking_lot::RwLock;
type RpcResult = Result<jsonrpc_core::Value, jsonrpc_core::Error>; type RpcResult = Result<jsonrpc_core::Value, jsonrpc_core::Error>;
const NOTIFY_COUNTER_INITIAL: u32 = 16; const NOTIFY_COUNTER_INITIAL: u32 = 16;
struct StratumRpc { /// Container which owns rpc server and stratum implementation
stratum: RwLock<Option<Arc<Stratum>>>,
}
impl StratumRpc {
fn subscribe(&self, params: Params, meta: SocketMetadata) -> RpcResult {
self.stratum.read().as_ref().expect("RPC methods are called after stratum is set.")
.subscribe(params, meta)
}
fn authorize(&self, params: Params, meta: SocketMetadata) -> RpcResult {
self.stratum.read().as_ref().expect("RPC methods are called after stratum is set.")
.authorize(params, meta)
}
fn submit(&self, params: Params, meta: SocketMetadata) -> RpcResult {
self.stratum.read().as_ref().expect("RPC methods are called after stratum is set.")
.submit(params, meta)
}
}
#[derive(Clone)]
pub struct SocketMetadata {
addr: SocketAddr,
}
impl Default for SocketMetadata {
fn default() -> Self {
SocketMetadata { addr: "0.0.0.0:0".parse().unwrap() }
}
}
impl SocketMetadata {
pub fn addr(&self) -> &SocketAddr {
&self.addr
}
}
impl Metadata for SocketMetadata { }
impl From<SocketAddr> for SocketMetadata {
fn from(addr: SocketAddr) -> SocketMetadata {
SocketMetadata { addr: addr }
}
}
pub struct PeerMetaExtractor;
impl MetaExtractor<SocketMetadata> for PeerMetaExtractor {
fn extract(&self, context: &RequestContext) -> SocketMetadata {
context.peer_addr.into()
}
}
pub struct Stratum { pub struct Stratum {
/// RPC server
///
/// It is an `Option` so it can be easily closed and released during `drop` phase
rpc_server: Option<JsonRpcServer>, rpc_server: Option<JsonRpcServer>,
/// stratum protocol implementation
///
/// It is owned by a container and rpc server
implementation: Arc<StratumImpl>,
/// Message dispatcher (tcp/ip service)
///
/// Used to push messages to peers
tcp_dispatcher: Dispatcher,
}
impl Stratum {
pub fn start(
addr: &SocketAddr,
dispatcher: Arc<JobDispatcher>,
secret: Option<H256>,
) -> Result<Arc<Stratum>, Error> {
let implementation = Arc::new(StratumImpl {
subscribers: RwLock::new(Vec::new()),
job_que: RwLock::new(HashSet::new()),
dispatcher,
workers: Arc::new(RwLock::new(HashMap::new())),
secret,
notify_counter: RwLock::new(NOTIFY_COUNTER_INITIAL),
});
let mut delegate = IoDelegate::<StratumImpl, SocketMetadata>::new(implementation.clone());
delegate.add_method_with_meta("mining.subscribe", StratumImpl::subscribe);
delegate.add_method_with_meta("mining.authorize", StratumImpl::authorize);
delegate.add_method_with_meta("mining.submit", StratumImpl::submit);
let mut handler = MetaIoHandler::<SocketMetadata>::with_compatibility(Compatibility::Both);
handler.extend_with(delegate);
let server_builder = JsonRpcServerBuilder::new(handler);
let tcp_dispatcher = server_builder.dispatcher();
let server_builder = server_builder.session_meta_extractor(PeerMetaExtractor::new(tcp_dispatcher.clone()));
let server = server_builder.start(addr)?;
let stratum = Arc::new(Stratum {
rpc_server: Some(server),
implementation,
tcp_dispatcher,
});
Ok(stratum)
}
}
impl PushWorkHandler for Stratum {
fn push_work_all(&self, payload: String) -> Result<(), Error> {
self.implementation.push_work_all(payload, &self.tcp_dispatcher)
}
fn push_work(&self, payloads: Vec<String>) -> Result<(), Error> {
self.implementation.push_work(payloads, &self.tcp_dispatcher)
}
}
impl Drop for Stratum {
fn drop(&mut self) {
// shut down rpc server
self.rpc_server.take().map(|server| server.close());
}
}
struct StratumImpl {
/// Subscribed clients /// Subscribed clients
subscribers: RwLock<Vec<SocketAddr>>, subscribers: RwLock<Vec<SocketAddr>>,
/// List of workers supposed to receive job update /// List of workers supposed to receive job update
@ -121,84 +137,10 @@ pub struct Stratum {
secret: Option<H256>, secret: Option<H256>,
/// Dispatch notify couinter /// Dispatch notify couinter
notify_counter: RwLock<u32>, notify_counter: RwLock<u32>,
/// Message dispatcher (tcp/ip service)
tcp_dispatcher: Dispatcher,
} }
impl Drop for Stratum { impl StratumImpl {
fn drop(&mut self) { /// rpc method `mining.subscribe`
self.rpc_server.take().map(|server| server.close());
}
}
impl Stratum {
pub fn start(
addr: &SocketAddr,
dispatcher: Arc<JobDispatcher>,
secret: Option<H256>,
) -> Result<Arc<Stratum>, Error> {
let rpc = Arc::new(StratumRpc {
stratum: RwLock::new(None),
});
let mut delegate = IoDelegate::<StratumRpc, SocketMetadata>::new(rpc.clone());
delegate.add_method_with_meta("mining.subscribe", StratumRpc::subscribe);
delegate.add_method_with_meta("mining.authorize", StratumRpc::authorize);
delegate.add_method_with_meta("mining.submit", StratumRpc::submit);
let mut handler = MetaIoHandler::<SocketMetadata>::with_compatibility(Compatibility::Both);
handler.extend_with(delegate);
let server = JsonRpcServerBuilder::new(handler)
.session_meta_extractor(PeerMetaExtractor);
let tcp_dispatcher = server.dispatcher();
let server = server.start(addr)?;
let stratum = Arc::new(Stratum {
tcp_dispatcher: tcp_dispatcher,
rpc_server: Some(server),
subscribers: RwLock::new(Vec::new()),
job_que: RwLock::new(HashSet::new()),
dispatcher: dispatcher,
workers: Arc::new(RwLock::new(HashMap::new())),
secret: secret,
notify_counter: RwLock::new(NOTIFY_COUNTER_INITIAL),
});
*rpc.stratum.write() = Some(stratum.clone());
Ok(stratum)
}
fn update_peers(&self) {
if let Some(job) = self.dispatcher.job() {
if let Err(e) = self.push_work_all(job) {
warn!("Failed to update some of the peers: {:?}", e);
}
}
}
fn submit(&self, params: Params, _meta: SocketMetadata) -> RpcResult {
Ok(match params {
Params::Array(vals) => {
// first two elements are service messages (worker_id & job_id)
match self.dispatcher.submit(vals.iter().skip(2)
.filter_map(|val| match val { &Value::String(ref str) => Some(str.to_owned()), _ => None })
.collect::<Vec<String>>()) {
Ok(()) => {
self.update_peers();
to_value(true)
},
Err(submit_err) => {
warn!("Error while submitting share: {:?}", submit_err);
to_value(false)
}
}
},
_ => {
trace!(target: "stratum", "Invalid submit work format {:?}", params);
to_value(false)
}
}.expect("Only true/false is returned and it's always serializable; qed"))
}
fn subscribe(&self, _params: Params, meta: SocketMetadata) -> RpcResult { fn subscribe(&self, _params: Params, meta: SocketMetadata) -> RpcResult {
use std::str::FromStr; use std::str::FromStr;
@ -218,6 +160,7 @@ impl Stratum {
}.expect("Empty slices are serializable; qed")) }.expect("Empty slices are serializable; qed"))
} }
/// rpc method `mining.authorize`
fn authorize(&self, params: Params, meta: SocketMetadata) -> RpcResult { fn authorize(&self, params: Params, meta: SocketMetadata) -> RpcResult {
params.parse::<(String, String)>().map(|(worker_id, secret)|{ params.parse::<(String, String)>().map(|(worker_id, secret)|{
if let Some(valid_secret) = self.secret { if let Some(valid_secret) = self.secret {
@ -232,23 +175,44 @@ impl Stratum {
}).map(|v| v.expect("Only true/false is returned and it's always serializable; qed")) }).map(|v| v.expect("Only true/false is returned and it's always serializable; qed"))
} }
pub fn subscribers(&self) -> RwLockReadGuard<Vec<SocketAddr>> { /// rpc method `mining.submit`
self.subscribers.read() fn submit(&self, params: Params, meta: SocketMetadata) -> RpcResult {
Ok(match params {
Params::Array(vals) => {
// first two elements are service messages (worker_id & job_id)
match self.dispatcher.submit(vals.iter().skip(2)
.filter_map(|val| match *val {
Value::String(ref s) => Some(s.to_owned()),
_ => None
})
.collect::<Vec<String>>()) {
Ok(()) => {
self.update_peers(&meta.tcp_dispatcher.expect("tcp_dispatcher is always initialized; qed"));
to_value(true)
},
Err(submit_err) => {
warn!("Error while submitting share: {:?}", submit_err);
to_value(false)
}
}
},
_ => {
trace!(target: "stratum", "Invalid submit work format {:?}", params);
to_value(false)
}
}.expect("Only true/false is returned and it's always serializable; qed"))
} }
pub fn maintain(&self) { /// Helper method
let mut job_que = self.job_que.write(); fn update_peers(&self, tcp_dispatcher: &Dispatcher) {
let job_payload = self.dispatcher.job(); if let Some(job) = self.dispatcher.job() {
for socket_addr in job_que.drain() { if let Err(e) = self.push_work_all(job, tcp_dispatcher) {
job_payload.as_ref().map( warn!("Failed to update some of the peers: {:?}", e);
|json| self.tcp_dispatcher.push_message(&socket_addr, json.to_owned()) }
);
} }
} }
}
impl PushWorkHandler for Stratum { fn push_work_all(&self, payload: String, tcp_dispatcher: &Dispatcher) -> Result<(), Error> {
fn push_work_all(&self, payload: String) -> Result<(), Error> {
let hup_peers = { let hup_peers = {
let workers = self.workers.read(); let workers = self.workers.read();
let next_request_id = { let next_request_id = {
@ -263,7 +227,7 @@ impl PushWorkHandler for Stratum {
trace!(target: "stratum", "pushing work for {} workers (payload: '{}')", workers.len(), &workers_msg); trace!(target: "stratum", "pushing work for {} workers (payload: '{}')", workers.len(), &workers_msg);
for (ref addr, _) in workers.iter() { for (ref addr, _) in workers.iter() {
trace!(target: "stratum", "pusing work to {}", addr); trace!(target: "stratum", "pusing work to {}", addr);
match self.tcp_dispatcher.push_message(addr, workers_msg.clone()) { match tcp_dispatcher.push_message(addr, workers_msg.clone()) {
Err(PushMessageError::NoSuchPeer) => { Err(PushMessageError::NoSuchPeer) => {
trace!(target: "stratum", "Worker no longer connected: {}", &addr); trace!(target: "stratum", "Worker no longer connected: {}", &addr);
hup_peers.insert(*addr.clone()); hup_peers.insert(*addr.clone());
@ -285,7 +249,7 @@ impl PushWorkHandler for Stratum {
Ok(()) Ok(())
} }
fn push_work(&self, payloads: Vec<String>) -> Result<(), Error> { fn push_work(&self, payloads: Vec<String>, tcp_dispatcher: &Dispatcher) -> Result<(), Error> {
if !payloads.len() > 0 { if !payloads.len() > 0 {
return Err(Error::NoWork); return Err(Error::NoWork);
} }
@ -299,16 +263,63 @@ impl PushWorkHandler for Stratum {
while que.len() > 0 { while que.len() > 0 {
let next_worker = addrs[addr_index]; let next_worker = addrs[addr_index];
let mut next_payload = que.drain(0..1); let mut next_payload = que.drain(0..1);
self.tcp_dispatcher.push_message( tcp_dispatcher.push_message(
next_worker, next_worker,
next_payload.nth(0).expect("drained successfully of 0..1, so 0-th element should exist") next_payload.nth(0).expect("drained successfully of 0..1, so 0-th element should exist")
)?; )?;
addr_index = addr_index + 1; addr_index = addr_index + 1;
} }
Ok(()) Ok(())
} }
} }
#[derive(Clone)]
pub struct SocketMetadata {
addr: SocketAddr,
// with the new version of jsonrpc-core, SocketMetadata
// won't have to implement default, so this field will not
// have to be an Option
tcp_dispatcher: Option<Dispatcher>,
}
impl Default for SocketMetadata {
fn default() -> Self {
SocketMetadata {
addr: "0.0.0.0:0".parse().unwrap(),
tcp_dispatcher: None,
}
}
}
impl SocketMetadata {
pub fn addr(&self) -> &SocketAddr {
&self.addr
}
}
impl Metadata for SocketMetadata { }
pub struct PeerMetaExtractor {
tcp_dispatcher: Dispatcher,
}
impl PeerMetaExtractor {
fn new(tcp_dispatcher: Dispatcher) -> Self {
PeerMetaExtractor {
tcp_dispatcher,
}
}
}
impl MetaExtractor<SocketMetadata> for PeerMetaExtractor {
fn extract(&self, context: &RequestContext) -> SocketMetadata {
SocketMetadata {
addr: context.peer_addr,
tcp_dispatcher: Some(self.tcp_dispatcher.clone()),
}
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
@ -367,7 +378,7 @@ mod tests {
let stratum = Stratum::start(&addr, Arc::new(VoidManager), None).unwrap(); let stratum = Stratum::start(&addr, Arc::new(VoidManager), None).unwrap();
let request = r#"{"jsonrpc": "2.0", "method": "mining.subscribe", "params": [], "id": 1}"#; let request = r#"{"jsonrpc": "2.0", "method": "mining.subscribe", "params": [], "id": 1}"#;
dummy_request(&addr, request); dummy_request(&addr, request);
assert_eq!(1, stratum.subscribers.read().len()); assert_eq!(1, stratum.implementation.subscribers.read().len());
} }
struct DummyManager { struct DummyManager {
@ -409,7 +420,7 @@ mod tests {
#[test] #[test]
fn receives_initial_paylaod() { fn receives_initial_paylaod() {
let addr = SocketAddr::from_str("127.0.0.1:19975").unwrap(); let addr = SocketAddr::from_str("127.0.0.1:19975").unwrap();
Stratum::start(&addr, DummyManager::new(), None).expect("There should be no error starting stratum"); let _stratum = Stratum::start(&addr, DummyManager::new(), None).expect("There should be no error starting stratum");
let request = r#"{"jsonrpc": "2.0", "method": "mining.subscribe", "params": [], "id": 2}"#; let request = r#"{"jsonrpc": "2.0", "method": "mining.subscribe", "params": [], "id": 2}"#;
let response = String::from_utf8(dummy_request(&addr, request)).unwrap(); let response = String::from_utf8(dummy_request(&addr, request)).unwrap();
@ -430,7 +441,7 @@ mod tests {
let response = String::from_utf8(dummy_request(&addr, request)).unwrap(); let response = String::from_utf8(dummy_request(&addr, request)).unwrap();
assert_eq!(terminated_str(r#"{"jsonrpc":"2.0","result":true,"id":1}"#), response); assert_eq!(terminated_str(r#"{"jsonrpc":"2.0","result":true,"id":1}"#), response);
assert_eq!(1, stratum.workers.read().len()); assert_eq!(1, stratum.implementation.workers.read().len());
} }
#[test] #[test]