New version of jsonrpc.

This commit is contained in:
Tomasz Drwięga
2017-03-13 15:49:52 +01:00
parent be87151f1c
commit 8bf5be0cc4
28 changed files with 469 additions and 480 deletions

View File

@@ -44,8 +44,8 @@ pub use traits::{
};
use jsonrpc_tcp_server::{
Server as JsonRpcServer, RequestContext, MetaExtractor, Dispatcher,
PushMessageError
Server as JsonRpcServer, ServerBuilder as JsonRpcServerBuilder,
RequestContext, MetaExtractor, Dispatcher, PushMessageError,
};
use jsonrpc_core::{MetaIoHandler, Params, to_value, Value, Metadata, Compatibility};
use jsonrpc_macros::IoDelegate;
@@ -57,6 +57,8 @@ use util::{H256, Hashable, RwLock, RwLockReadGuard};
type RpcResult = BoxFuture<jsonrpc_core::Value, jsonrpc_core::Error>;
const NOTIFY_COUNTER_INITIAL: u32 = 16;
struct StratumRpc {
stratum: RwLock<Option<Arc<Stratum>>>,
}
@@ -112,7 +114,7 @@ impl MetaExtractor<SocketMetadata> for PeerMetaExtractor {
}
pub struct Stratum {
rpc_server: JsonRpcServer<SocketMetadata>,
rpc_server: Option<JsonRpcServer>,
/// Subscribed clients
subscribers: RwLock<Vec<SocketAddr>>,
/// List of workers supposed to receive job update
@@ -129,7 +131,11 @@ pub struct Stratum {
tcp_dispatcher: Dispatcher,
}
const NOTIFY_COUNTER_INITIAL: u32 = 16;
impl Drop for Stratum {
fn drop(&mut self) {
self.rpc_server.take().map(|server| server.close());
}
}
impl Stratum {
pub fn start(
@@ -148,12 +154,14 @@ impl Stratum {
let mut handler = MetaIoHandler::<SocketMetadata>::with_compatibility(Compatibility::Both);
handler.extend_with(delegate);
let server = JsonRpcServer::new(addr.clone(), Arc::new(handler))
.extractor(Arc::new(PeerMetaExtractor) as Arc<MetaExtractor<SocketMetadata>>);
let server = JsonRpcServerBuilder::new(handler)
.session_meta_extractor(PeerMetaExtractor);
let tcp_dispatcher = server.dispatcher();
let server = server.start(addr)?;
let stratum = Arc::new(Stratum {
tcp_dispatcher: server.dispatcher(),
rpc_server: server,
tcp_dispatcher: tcp_dispatcher,
rpc_server: Some(server),
subscribers: RwLock::new(Vec::new()),
job_que: RwLock::new(HashSet::new()),
dispatcher: dispatcher,
@@ -162,10 +170,6 @@ impl Stratum {
notify_counter: RwLock::new(NOTIFY_COUNTER_INITIAL),
});
*rpc.stratum.write() = Some(stratum.clone());
let running_stratum = stratum.clone();
::std::thread::spawn(move || running_stratum.rpc_server.run());
Ok(stratum)
}