From f244ebeb4ad559ec68d09e4c3ada3d0a486e3168 Mon Sep 17 00:00:00 2001 From: Marek Kotewicz Date: Wed, 7 Feb 2018 17:15:40 +0100 Subject: [PATCH] refactor stratum to remove retain cycle (#7827) * refactor stratum to remove retain cycle, fixed #7823 * fix tests --- stratum/src/lib.rs | 315 +++++++++++++++++++++++---------------------- 1 file changed, 163 insertions(+), 152 deletions(-) diff --git a/stratum/src/lib.rs b/stratum/src/lib.rs index 1efaeb052..d43bcc556 100644 --- a/stratum/src/lib.rs +++ b/stratum/src/lib.rs @@ -47,68 +47,84 @@ use std::net::SocketAddr; use std::collections::{HashSet, HashMap}; use hash::keccak; use ethereum_types::H256; -use parking_lot::{RwLock, RwLockReadGuard}; +use parking_lot::RwLock; type RpcResult = Result; const NOTIFY_COUNTER_INITIAL: u32 = 16; -struct StratumRpc { - stratum: RwLock>>, -} - -impl StratumRpc { - fn subscribe(&self, params: Params, meta: SocketMetadata) -> RpcResult { - self.stratum.read().as_ref().expect("RPC methods are called after stratum is set.") - .subscribe(params, meta) - } - - fn authorize(&self, params: Params, meta: SocketMetadata) -> RpcResult { - self.stratum.read().as_ref().expect("RPC methods are called after stratum is set.") - .authorize(params, meta) - } - - fn submit(&self, params: Params, meta: SocketMetadata) -> RpcResult { - self.stratum.read().as_ref().expect("RPC methods are called after stratum is set.") - .submit(params, meta) - } -} - -#[derive(Clone)] -pub struct SocketMetadata { - addr: SocketAddr, -} - -impl Default for SocketMetadata { - fn default() -> Self { - SocketMetadata { addr: "0.0.0.0:0".parse().unwrap() } - } -} - -impl SocketMetadata { - pub fn addr(&self) -> &SocketAddr { - &self.addr - } -} - -impl Metadata for SocketMetadata { } - -impl From for SocketMetadata { - fn from(addr: SocketAddr) -> SocketMetadata { - SocketMetadata { addr: addr } - } -} - -pub struct PeerMetaExtractor; - -impl MetaExtractor for PeerMetaExtractor { - fn extract(&self, context: &RequestContext) -> SocketMetadata { - context.peer_addr.into() - } -} - +/// Container which owns rpc server and stratum implementation pub struct Stratum { + /// RPC server + /// + /// It is an `Option` so it can be easily closed and released during `drop` phase rpc_server: Option, + /// stratum protocol implementation + /// + /// It is owned by a container and rpc server + implementation: Arc, + /// Message dispatcher (tcp/ip service) + /// + /// Used to push messages to peers + tcp_dispatcher: Dispatcher, +} + +impl Stratum { + pub fn start( + addr: &SocketAddr, + dispatcher: Arc, + secret: Option, + ) -> Result, Error> { + + let implementation = Arc::new(StratumImpl { + subscribers: RwLock::new(Vec::new()), + job_que: RwLock::new(HashSet::new()), + dispatcher, + workers: Arc::new(RwLock::new(HashMap::new())), + secret, + notify_counter: RwLock::new(NOTIFY_COUNTER_INITIAL), + }); + + let mut delegate = IoDelegate::::new(implementation.clone()); + delegate.add_method_with_meta("mining.subscribe", StratumImpl::subscribe); + delegate.add_method_with_meta("mining.authorize", StratumImpl::authorize); + delegate.add_method_with_meta("mining.submit", StratumImpl::submit); + let mut handler = MetaIoHandler::::with_compatibility(Compatibility::Both); + handler.extend_with(delegate); + + let server_builder = JsonRpcServerBuilder::new(handler); + let tcp_dispatcher = server_builder.dispatcher(); + let server_builder = server_builder.session_meta_extractor(PeerMetaExtractor::new(tcp_dispatcher.clone())); + let server = server_builder.start(addr)?; + + let stratum = Arc::new(Stratum { + rpc_server: Some(server), + implementation, + tcp_dispatcher, + }); + + Ok(stratum) + } +} + +impl PushWorkHandler for Stratum { + fn push_work_all(&self, payload: String) -> Result<(), Error> { + self.implementation.push_work_all(payload, &self.tcp_dispatcher) + } + + fn push_work(&self, payloads: Vec) -> Result<(), Error> { + self.implementation.push_work(payloads, &self.tcp_dispatcher) + } +} + +impl Drop for Stratum { + fn drop(&mut self) { + // shut down rpc server + self.rpc_server.take().map(|server| server.close()); + } +} + +struct StratumImpl { /// Subscribed clients subscribers: RwLock>, /// List of workers supposed to receive job update @@ -121,84 +137,10 @@ pub struct Stratum { secret: Option, /// Dispatch notify couinter notify_counter: RwLock, - /// Message dispatcher (tcp/ip service) - tcp_dispatcher: Dispatcher, } -impl Drop for Stratum { - fn drop(&mut self) { - self.rpc_server.take().map(|server| server.close()); - } -} - -impl Stratum { - pub fn start( - addr: &SocketAddr, - dispatcher: Arc, - secret: Option, - ) -> Result, Error> { - - let rpc = Arc::new(StratumRpc { - stratum: RwLock::new(None), - }); - let mut delegate = IoDelegate::::new(rpc.clone()); - delegate.add_method_with_meta("mining.subscribe", StratumRpc::subscribe); - delegate.add_method_with_meta("mining.authorize", StratumRpc::authorize); - delegate.add_method_with_meta("mining.submit", StratumRpc::submit); - let mut handler = MetaIoHandler::::with_compatibility(Compatibility::Both); - handler.extend_with(delegate); - - let server = JsonRpcServerBuilder::new(handler) - .session_meta_extractor(PeerMetaExtractor); - let tcp_dispatcher = server.dispatcher(); - let server = server.start(addr)?; - - let stratum = Arc::new(Stratum { - tcp_dispatcher: tcp_dispatcher, - rpc_server: Some(server), - subscribers: RwLock::new(Vec::new()), - job_que: RwLock::new(HashSet::new()), - dispatcher: dispatcher, - workers: Arc::new(RwLock::new(HashMap::new())), - secret: secret, - notify_counter: RwLock::new(NOTIFY_COUNTER_INITIAL), - }); - *rpc.stratum.write() = Some(stratum.clone()); - Ok(stratum) - } - - fn update_peers(&self) { - if let Some(job) = self.dispatcher.job() { - if let Err(e) = self.push_work_all(job) { - warn!("Failed to update some of the peers: {:?}", e); - } - } - } - - fn submit(&self, params: Params, _meta: SocketMetadata) -> RpcResult { - Ok(match params { - Params::Array(vals) => { - // first two elements are service messages (worker_id & job_id) - match self.dispatcher.submit(vals.iter().skip(2) - .filter_map(|val| match val { &Value::String(ref str) => Some(str.to_owned()), _ => None }) - .collect::>()) { - Ok(()) => { - self.update_peers(); - to_value(true) - }, - Err(submit_err) => { - warn!("Error while submitting share: {:?}", submit_err); - to_value(false) - } - } - }, - _ => { - trace!(target: "stratum", "Invalid submit work format {:?}", params); - to_value(false) - } - }.expect("Only true/false is returned and it's always serializable; qed")) - } - +impl StratumImpl { + /// rpc method `mining.subscribe` fn subscribe(&self, _params: Params, meta: SocketMetadata) -> RpcResult { use std::str::FromStr; @@ -218,6 +160,7 @@ impl Stratum { }.expect("Empty slices are serializable; qed")) } + /// rpc method `mining.authorize` fn authorize(&self, params: Params, meta: SocketMetadata) -> RpcResult { params.parse::<(String, String)>().map(|(worker_id, secret)|{ if let Some(valid_secret) = self.secret { @@ -232,23 +175,44 @@ impl Stratum { }).map(|v| v.expect("Only true/false is returned and it's always serializable; qed")) } - pub fn subscribers(&self) -> RwLockReadGuard> { - self.subscribers.read() + /// rpc method `mining.submit` + fn submit(&self, params: Params, meta: SocketMetadata) -> RpcResult { + Ok(match params { + Params::Array(vals) => { + // first two elements are service messages (worker_id & job_id) + match self.dispatcher.submit(vals.iter().skip(2) + .filter_map(|val| match *val { + Value::String(ref s) => Some(s.to_owned()), + _ => None + }) + .collect::>()) { + Ok(()) => { + self.update_peers(&meta.tcp_dispatcher.expect("tcp_dispatcher is always initialized; qed")); + to_value(true) + }, + Err(submit_err) => { + warn!("Error while submitting share: {:?}", submit_err); + to_value(false) + } + } + }, + _ => { + trace!(target: "stratum", "Invalid submit work format {:?}", params); + to_value(false) + } + }.expect("Only true/false is returned and it's always serializable; qed")) } - pub fn maintain(&self) { - let mut job_que = self.job_que.write(); - let job_payload = self.dispatcher.job(); - for socket_addr in job_que.drain() { - job_payload.as_ref().map( - |json| self.tcp_dispatcher.push_message(&socket_addr, json.to_owned()) - ); + /// Helper method + fn update_peers(&self, tcp_dispatcher: &Dispatcher) { + if let Some(job) = self.dispatcher.job() { + if let Err(e) = self.push_work_all(job, tcp_dispatcher) { + warn!("Failed to update some of the peers: {:?}", e); + } } } -} -impl PushWorkHandler for Stratum { - fn push_work_all(&self, payload: String) -> Result<(), Error> { + fn push_work_all(&self, payload: String, tcp_dispatcher: &Dispatcher) -> Result<(), Error> { let hup_peers = { let workers = self.workers.read(); let next_request_id = { @@ -263,7 +227,7 @@ impl PushWorkHandler for Stratum { trace!(target: "stratum", "pushing work for {} workers (payload: '{}')", workers.len(), &workers_msg); for (ref addr, _) in workers.iter() { trace!(target: "stratum", "pusing work to {}", addr); - match self.tcp_dispatcher.push_message(addr, workers_msg.clone()) { + match tcp_dispatcher.push_message(addr, workers_msg.clone()) { Err(PushMessageError::NoSuchPeer) => { trace!(target: "stratum", "Worker no longer connected: {}", &addr); hup_peers.insert(*addr.clone()); @@ -285,7 +249,7 @@ impl PushWorkHandler for Stratum { Ok(()) } - fn push_work(&self, payloads: Vec) -> Result<(), Error> { + fn push_work(&self, payloads: Vec, tcp_dispatcher: &Dispatcher) -> Result<(), Error> { if !payloads.len() > 0 { return Err(Error::NoWork); } @@ -299,16 +263,63 @@ impl PushWorkHandler for Stratum { while que.len() > 0 { let next_worker = addrs[addr_index]; let mut next_payload = que.drain(0..1); - self.tcp_dispatcher.push_message( - next_worker, - next_payload.nth(0).expect("drained successfully of 0..1, so 0-th element should exist") - )?; + tcp_dispatcher.push_message( + next_worker, + next_payload.nth(0).expect("drained successfully of 0..1, so 0-th element should exist") + )?; addr_index = addr_index + 1; } Ok(()) } } +#[derive(Clone)] +pub struct SocketMetadata { + addr: SocketAddr, + // with the new version of jsonrpc-core, SocketMetadata + // won't have to implement default, so this field will not + // have to be an Option + tcp_dispatcher: Option, +} + +impl Default for SocketMetadata { + fn default() -> Self { + SocketMetadata { + addr: "0.0.0.0:0".parse().unwrap(), + tcp_dispatcher: None, + } + } +} + +impl SocketMetadata { + pub fn addr(&self) -> &SocketAddr { + &self.addr + } +} + +impl Metadata for SocketMetadata { } + +pub struct PeerMetaExtractor { + tcp_dispatcher: Dispatcher, +} + +impl PeerMetaExtractor { + fn new(tcp_dispatcher: Dispatcher) -> Self { + PeerMetaExtractor { + tcp_dispatcher, + } + } +} + +impl MetaExtractor for PeerMetaExtractor { + fn extract(&self, context: &RequestContext) -> SocketMetadata { + SocketMetadata { + addr: context.peer_addr, + tcp_dispatcher: Some(self.tcp_dispatcher.clone()), + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -367,7 +378,7 @@ mod tests { let stratum = Stratum::start(&addr, Arc::new(VoidManager), None).unwrap(); let request = r#"{"jsonrpc": "2.0", "method": "mining.subscribe", "params": [], "id": 1}"#; dummy_request(&addr, request); - assert_eq!(1, stratum.subscribers.read().len()); + assert_eq!(1, stratum.implementation.subscribers.read().len()); } struct DummyManager { @@ -409,7 +420,7 @@ mod tests { #[test] fn receives_initial_paylaod() { let addr = SocketAddr::from_str("127.0.0.1:19975").unwrap(); - Stratum::start(&addr, DummyManager::new(), None).expect("There should be no error starting stratum"); + let _stratum = Stratum::start(&addr, DummyManager::new(), None).expect("There should be no error starting stratum"); let request = r#"{"jsonrpc": "2.0", "method": "mining.subscribe", "params": [], "id": 2}"#; let response = String::from_utf8(dummy_request(&addr, request)).unwrap(); @@ -430,7 +441,7 @@ mod tests { let response = String::from_utf8(dummy_request(&addr, request)).unwrap(); assert_eq!(terminated_str(r#"{"jsonrpc":"2.0","result":true,"id":1}"#), response); - assert_eq!(1, stratum.workers.read().len()); + assert_eq!(1, stratum.implementation.workers.read().len()); } #[test]