diff --git a/js/src/api/transport/ws/ws.js b/js/src/api/transport/ws/ws.js index 8721bf78a..d4e933917 100644 --- a/js/src/api/transport/ws/ws.js +++ b/js/src/api/transport/ws/ws.js @@ -195,11 +195,6 @@ export default class Ws extends JsonRpcBase { } _onMessage = (event) => { - // Event sent by Signer Broadcaster - if (event.data === 'new_message') { - return false; - } - try { const result = JSON.parse(event.data); const { method, params, json, resolve, reject } = this._messages[result.id]; diff --git a/parity/rpc_apis.rs b/parity/rpc_apis.rs index 48e66e322..8b89c179f 100644 --- a/parity/rpc_apis.rs +++ b/parity/rpc_apis.rs @@ -258,7 +258,7 @@ impl FullDependencies { handler.extend_with(PersonalClient::new(&self.secret_store, dispatcher.clone(), self.geth_compatibility).to_delegate()); }, Api::Signer => { - handler.extend_with(SignerClient::new(&self.secret_store, dispatcher.clone(), &self.signer_service).to_delegate()); + handler.extend_with(SignerClient::new(&self.secret_store, dispatcher.clone(), &self.signer_service, self.remote.clone()).to_delegate()); }, Api::Parity => { let signer = match self.signer_service.is_enabled() { @@ -352,6 +352,7 @@ pub struct LightDependencies { pub dapps_port: Option, pub fetch: FetchClient, pub geth_compatibility: bool, + pub remote: parity_reactor::Remote, } impl Dependencies for LightDependencies { @@ -415,7 +416,7 @@ impl Dependencies for LightDependencies { }, Api::Signer => { let secret_store = Some(self.secret_store.clone()); - handler.extend_with(SignerClient::new(&secret_store, dispatcher.clone(), &self.signer_service).to_delegate()); + handler.extend_with(SignerClient::new(&secret_store, dispatcher.clone(), &self.signer_service, self.remote.clone()).to_delegate()); }, Api::Parity => { let signer = match self.signer_service.is_enabled() { diff --git a/parity/run.rs b/parity/run.rs index 6f0987a7e..8b530adf1 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -292,6 +292,7 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc) -> }, fetch: fetch, geth_compatibility: cmd.geth_compatibility, + remote: event_loop.remote(), }); let dependencies = rpc::Dependencies { diff --git a/rpc/src/v1/helpers/mod.rs b/rpc/src/v1/helpers/mod.rs index 1e26de852..1f950f113 100644 --- a/rpc/src/v1/helpers/mod.rs +++ b/rpc/src/v1/helpers/mod.rs @@ -33,6 +33,7 @@ mod poll_filter; mod requests; mod signer; mod signing_queue; +mod subscribers; mod subscription_manager; pub use self::dispatch::{Dispatcher, FullDispatcher}; @@ -47,4 +48,5 @@ pub use self::signing_queue::{ QUEUE_LIMIT as SIGNING_QUEUE_LIMIT, }; pub use self::signer::SignerService; +pub use self::subscribers::Subscribers; pub use self::subscription_manager::GenericPollManager; diff --git a/rpc/src/v1/helpers/signing_queue.rs b/rpc/src/v1/helpers/signing_queue.rs index 70f116a4f..e9e6313f4 100644 --- a/rpc/src/v1/helpers/signing_queue.rs +++ b/rpc/src/v1/helpers/signing_queue.rs @@ -16,7 +16,7 @@ use std::mem; use std::cell::RefCell; -use std::sync::{mpsc, Arc}; +use std::sync::Arc; use std::collections::BTreeMap; use jsonrpc_core; use util::{Mutex, RwLock, U256, Address}; @@ -27,7 +27,6 @@ use v1::types::{ConfirmationResponse, H160 as RpcH160, Origin, DappId as RpcDapp /// Result that can be returned from JSON RPC. pub type RpcResult = Result; - /// Type of default account pub enum DefaultAccount { /// Default account is known @@ -49,7 +48,7 @@ impl From for DefaultAccount { } /// Possible events happening in the queue that can be listened to. -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone)] pub enum QueueEvent { /// Receiver should stop work upon receiving `Finish` message. Finish, @@ -61,15 +60,6 @@ pub enum QueueEvent { RequestConfirmed(U256), } -/// Defines possible errors returned from queue receiving method. -#[derive(Debug, PartialEq)] -pub enum QueueError { - /// Returned when method has been already used (no receiver available). - AlreadyUsed, - /// Returned when receiver encounters an error. - ReceiverError(mpsc::RecvError), -} - /// Defines possible errors when inserting to queue #[derive(Debug, PartialEq)] pub enum QueueAddError { @@ -184,59 +174,31 @@ impl ConfirmationPromise { /// Queue for all unconfirmed requests. +#[derive(Default)] pub struct ConfirmationsQueue { id: Mutex, queue: RwLock>, - sender: Mutex>, - receiver: Mutex>>, -} - -impl Default for ConfirmationsQueue { - fn default() -> Self { - let (send, recv) = mpsc::channel(); - - ConfirmationsQueue { - id: Mutex::new(U256::from(0)), - queue: RwLock::new(BTreeMap::new()), - sender: Mutex::new(send), - receiver: Mutex::new(Some(recv)), - } - } + on_event: RwLock () + Send + Sync>>>, } impl ConfirmationsQueue { - - /// Blocks the thread and starts listening for notifications regarding all actions in the queue. - /// For each event, `listener` callback will be invoked. - /// This method can be used only once (only single consumer of events can exist). - pub fn start_listening(&self, listener: F) -> Result<(), QueueError> - where F: Fn(QueueEvent) -> () { - let recv = self.receiver.lock().take(); - if let None = recv { - return Err(QueueError::AlreadyUsed); - } - let recv = recv.expect("Check for none is done earlier."); - - loop { - let message = recv.recv().map_err(|e| QueueError::ReceiverError(e))?; - if let QueueEvent::Finish = message { - return Ok(()); - } - - listener(message); - } + /// Adds a queue listener. For each event, `listener` callback will be invoked. + pub fn on_event () + Send + Sync + 'static>(&self, listener: F) { + self.on_event.write().push(Box::new(listener)); } /// Notifies consumer that the communcation is over. /// No more events will be sent after this function is invoked. pub fn finish(&self) { self.notify(QueueEvent::Finish); + self.on_event.write().clear(); } /// Notifies receiver about the event happening in this queue. fn notify(&self, message: QueueEvent) { - // We don't really care about the result - let _ = self.sender.lock().send(message); + for listener in &*self.on_event.read() { + listener(message.clone()) + } } /// Removes requests from this queue and notifies `ConfirmationPromise` holders about the result. @@ -384,26 +346,23 @@ mod test { #[test] fn should_receive_notification() { // given - let received = Arc::new(Mutex::new(None)); + let received = Arc::new(Mutex::new(vec![])); let queue = Arc::new(ConfirmationsQueue::default()); let request = request(); // when - let q = queue.clone(); let r = received.clone(); - let handle = thread::spawn(move || { - q.start_listening(move |notification| { - let mut v = r.lock(); - *v = Some(notification); - }).expect("Should be closed nicely.") + queue.on_event(move |notification| { + r.lock().push(notification); }); queue.add_request(request, Default::default()).unwrap(); queue.finish(); // then - handle.join().expect("Thread should finish nicely"); - let r = received.lock().take(); - assert_eq!(r, Some(QueueEvent::NewRequest(U256::from(1)))); + let r = received.lock(); + assert_eq!(r[0], QueueEvent::NewRequest(U256::from(1))); + assert_eq!(r[1], QueueEvent::Finish); + assert_eq!(r.len(), 2); } #[test] diff --git a/rpc/src/v1/helpers/subscribers.rs b/rpc/src/v1/helpers/subscribers.rs new file mode 100644 index 000000000..a67dbb464 --- /dev/null +++ b/rpc/src/v1/helpers/subscribers.rs @@ -0,0 +1,86 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! A map of subscribers. + +use std::ops; +use std::collections::HashMap; +use jsonrpc_macros::pubsub::{Subscriber, Sink, SubscriptionId}; + +#[derive(Clone, Debug)] +pub struct Subscribers { + next_id: u64, + subscriptions: HashMap, +} + +impl Default for Subscribers { + fn default() -> Self { + Subscribers { + next_id: 0, + subscriptions: HashMap::new(), + } + } +} + +impl Subscribers { + fn next_id(&mut self) -> u64 { + self.next_id += 1; + self.next_id + } + + /// Insert new subscription and return assigned id. + pub fn insert(&mut self, val: T) -> SubscriptionId { + let id = self.next_id(); + debug!(target: "pubsub", "Adding subscription id={}", id); + self.subscriptions.insert(id, val); + SubscriptionId::Number(id) + } + + /// Removes subscription with given id and returns it (if any). + pub fn remove(&mut self, id: &SubscriptionId) -> Option { + trace!(target: "pubsub", "Removing subscription id={:?}", id); + match *id { + SubscriptionId::Number(id) => { + self.subscriptions.remove(&id) + }, + _ => None, + } + } +} + +impl Subscribers> { + /// Assigns id and adds a subscriber to the list. + pub fn push(&mut self, sub: Subscriber) { + let id = self.next_id(); + match sub.assign_id(SubscriptionId::Number(id)) { + Ok(sink) => { + debug!(target: "pubsub", "Adding subscription id={:?}", id); + self.subscriptions.insert(id, sink); + }, + Err(_) => { + self.next_id -= 1; + }, + } + } +} + +impl ops::Deref for Subscribers { + type Target = HashMap; + + fn deref(&self) -> &Self::Target { + &self.subscriptions + } +} diff --git a/rpc/src/v1/helpers/subscription_manager.rs b/rpc/src/v1/helpers/subscription_manager.rs index 4b82fdcc4..6f97b3462 100644 --- a/rpc/src/v1/helpers/subscription_manager.rs +++ b/rpc/src/v1/helpers/subscription_manager.rs @@ -17,14 +17,15 @@ //! Generic poll manager for Pub-Sub. use std::sync::Arc; -use std::collections::HashMap; use util::Mutex; use jsonrpc_core::futures::future::{self, Either}; use jsonrpc_core::futures::sync::mpsc; use jsonrpc_core::futures::{Sink, Future, BoxFuture}; use jsonrpc_core::{self as core, MetaIoHandler}; +use jsonrpc_pubsub::SubscriptionId; +use v1::helpers::Subscribers; use v1::metadata::Metadata; #[derive(Debug)] @@ -40,8 +41,7 @@ struct Subscription { /// TODO [ToDr] Depending on the method decide on poll interval. /// For most of the methods it will be enough to poll on new block instead of time-interval. pub struct GenericPollManager> { - next_id: usize, - poll_subscriptions: HashMap, + subscribers: Subscribers, rpc: MetaIoHandler, } @@ -49,21 +49,16 @@ impl> GenericPollManager { /// Creates new poll manager pub fn new(rpc: MetaIoHandler) -> Self { GenericPollManager { - next_id: 1, - poll_subscriptions: Default::default(), + subscribers: Default::default(), rpc: rpc, } } /// Subscribes to update from polling given method. pub fn subscribe(&mut self, metadata: Metadata, method: String, params: core::Params) - -> (usize, mpsc::Receiver>) + -> (SubscriptionId, mpsc::Receiver>) { - let id = self.next_id; - self.next_id += 1; - let (sink, stream) = mpsc::channel(1); - let subscription = Subscription { metadata: metadata, method: method, @@ -71,21 +66,19 @@ impl> GenericPollManager { sink: sink, last_result: Default::default(), }; - - debug!(target: "pubsub", "Adding subscription id={:?}, {:?}", id, subscription); - self.poll_subscriptions.insert(id, subscription); + let id = self.subscribers.insert(subscription); (id, stream) } - pub fn unsubscribe(&mut self, id: usize) -> bool { + pub fn unsubscribe(&mut self, id: &SubscriptionId) -> bool { debug!(target: "pubsub", "Removing subscription: {:?}", id); - self.poll_subscriptions.remove(&id).is_some() + self.subscribers.remove(id).is_some() } pub fn tick(&self) -> BoxFuture<(), ()> { let mut futures = Vec::new(); // poll all subscriptions - for (id, subscription) in self.poll_subscriptions.iter() { + for (id, subscription) in self.subscribers.iter() { let call = core::MethodCall { jsonrpc: Some(core::Version::V2), id: core::Id::Num(*id as u64), @@ -130,6 +123,7 @@ mod tests { use jsonrpc_core::{MetaIoHandler, NoopMiddleware, Value, Params}; use jsonrpc_core::futures::{Future, Stream}; + use jsonrpc_pubsub::SubscriptionId; use http::tokio_core::reactor; use super::GenericPollManager; @@ -154,7 +148,7 @@ mod tests { let mut el = reactor::Core::new().unwrap(); let mut poll_manager = poll_manager(); let (id, rx) = poll_manager.subscribe(Default::default(), "hello".into(), Params::None); - assert_eq!(id, 1); + assert_eq!(id, SubscriptionId::Number(1)); // then poll_manager.tick().wait().unwrap(); @@ -169,7 +163,7 @@ mod tests { // and no more notifications poll_manager.tick().wait().unwrap(); // we need to unsubscribe otherwise the future will never finish. - poll_manager.unsubscribe(1); + poll_manager.unsubscribe(&id); assert_eq!(el.run(rx.into_future()).unwrap().0, None); } } diff --git a/rpc/src/v1/impls/pubsub.rs b/rpc/src/v1/impls/pubsub.rs index 8badefdb8..ea7bb4e91 100644 --- a/rpc/src/v1/impls/pubsub.rs +++ b/rpc/src/v1/impls/pubsub.rs @@ -70,7 +70,7 @@ impl> PubSub for PubSubClient { let mut poll_manager = self.poll_manager.write(); let (id, receiver) = poll_manager.subscribe(meta, method, params); - match subscriber.assign_id(SubscriptionId::Number(id as u64)) { + match subscriber.assign_id(id.clone()) { Ok(sink) => { self.remote.spawn(receiver.map(|res| match res { Ok(val) => val, @@ -83,18 +83,13 @@ impl> PubSub for PubSubClient { })).map(|_| ())); }, Err(_) => { - poll_manager.unsubscribe(id); + poll_manager.unsubscribe(&id); }, } } fn parity_unsubscribe(&self, id: SubscriptionId) -> BoxFuture { - let res = if let SubscriptionId::Number(id) = id { - self.poll_manager.write().unsubscribe(id as usize) - } else { - false - }; - + let res = self.poll_manager.write().unsubscribe(&id); futures::future::ok(res).boxed() } } diff --git a/rpc/src/v1/impls/signer.rs b/rpc/src/v1/impls/signer.rs index 0e09d1db2..e67042ed0 100644 --- a/rpc/src/v1/impls/signer.rs +++ b/rpc/src/v1/impls/signer.rs @@ -18,16 +18,21 @@ use std::sync::{Arc, Weak}; -use rlp::UntrustedRlp; use ethcore::account_provider::AccountProvider; use ethcore::transaction::{SignedTransaction, PendingTransaction}; use ethkey; use futures::{future, BoxFuture, Future, IntoFuture}; +use parity_reactor::Remote; +use rlp::UntrustedRlp; +use util::Mutex; -use jsonrpc_core::Error; +use jsonrpc_core::{futures, Error}; +use jsonrpc_pubsub::SubscriptionId; +use jsonrpc_macros::pubsub::{Sink, Subscriber}; use v1::helpers::accounts::unwrap_provider; use v1::helpers::dispatch::{self, Dispatcher, WithToken, eth_data_hash}; -use v1::helpers::{errors, SignerService, SigningQueue, ConfirmationPayload, FilledTransactionRequest}; +use v1::helpers::{errors, SignerService, SigningQueue, ConfirmationPayload, FilledTransactionRequest, Subscribers}; +use v1::metadata::Metadata; use v1::traits::Signer; use v1::types::{TransactionModification, ConfirmationRequest, ConfirmationResponse, ConfirmationResponseWithToken, U256, Bytes}; @@ -35,21 +40,40 @@ use v1::types::{TransactionModification, ConfirmationRequest, ConfirmationRespon pub struct SignerClient { signer: Weak, accounts: Option>, - dispatcher: D + dispatcher: D, + subscribers: Arc>>>>, } impl SignerClient { - /// Create new instance of signer client. pub fn new( store: &Option>, dispatcher: D, signer: &Arc, + remote: Remote, ) -> Self { + let subscribers = Arc::new(Mutex::new(Subscribers::default())); + let subs = Arc::downgrade(&subscribers); + let s = Arc::downgrade(signer); + signer.queue().on_event(move |_event| { + if let (Some(s), Some(subs)) = (s.upgrade(), subs.upgrade()) { + let requests = s.requests().into_iter().map(Into::into).collect::>(); + for subscription in subs.lock().values() { + let subscription: &Sink<_> = subscription; + remote.spawn(subscription + .notify(requests.clone()) + .map(|_| ()) + .map_err(|e| warn!(target: "rpc", "Unable to send notification: {}", e)) + ); + } + } + }); + SignerClient { signer: Arc::downgrade(signer), accounts: store.as_ref().map(Arc::downgrade), dispatcher: dispatcher, + subscribers: subscribers, } } @@ -139,10 +163,10 @@ impl SignerClient { } impl Signer for SignerClient { + type Metadata = Metadata; fn requests_to_confirm(&self) -> Result, Error> { let signer = take_weak!(self.signer); - Ok(signer.requests() .into_iter() .map(Into::into) @@ -214,23 +238,26 @@ impl Signer for SignerClient { } fn reject_request(&self, id: U256) -> Result { - let signer = take_weak!(self.signer); - - let res = signer.request_rejected(id.into()); + let res = take_weak!(self.signer).request_rejected(id.into()); Ok(res.is_some()) } fn generate_token(&self) -> Result { - let signer = take_weak!(self.signer); - - signer.generate_token() + take_weak!(self.signer).generate_token() .map_err(|e| errors::token(e)) } fn generate_web_proxy_token(&self) -> Result { - let signer = take_weak!(self.signer); + Ok(take_weak!(self.signer).generate_web_proxy_access_token()) + } - Ok(signer.generate_web_proxy_access_token()) + fn subscribe_pending(&self, _meta: Self::Metadata, sub: Subscriber>) { + self.subscribers.lock().push(sub) + } + + fn unsubscribe_pending(&self, id: SubscriptionId) -> BoxFuture { + let res = self.subscribers.lock().remove(&id).is_some(); + futures::future::ok(res).boxed() } } diff --git a/rpc/src/v1/tests/mocked/signer.rs b/rpc/src/v1/tests/mocked/signer.rs index d9f7b96a9..afa008cf3 100644 --- a/rpc/src/v1/tests/mocked/signer.rs +++ b/rpc/src/v1/tests/mocked/signer.rs @@ -21,6 +21,7 @@ use util::{U256, Uint, Address, ToPretty}; use ethcore::account_provider::AccountProvider; use ethcore::client::TestBlockChainClient; use ethcore::transaction::{Transaction, Action, SignedTransaction}; +use parity_reactor::EventLoop; use rlp::encode; use serde_json; @@ -40,6 +41,7 @@ struct SignerTester { // these unused fields are necessary to keep the data alive // as the handler has only weak pointers. _client: Arc, + _event_loop: EventLoop, } fn blockchain_client() -> Arc { @@ -61,10 +63,11 @@ fn signer_tester() -> SignerTester { let opt_accounts = Some(accounts.clone()); let client = blockchain_client(); let miner = miner_service(); + let event_loop = EventLoop::spawn(); let dispatcher = FullDispatcher::new(Arc::downgrade(&client), Arc::downgrade(&miner)); let mut io = IoHandler::default(); - io.extend_with(SignerClient::new(&opt_accounts, dispatcher, &signer).to_delegate()); + io.extend_with(SignerClient::new(&opt_accounts, dispatcher, &signer, event_loop.remote()).to_delegate()); SignerTester { signer: signer, @@ -72,6 +75,7 @@ fn signer_tester() -> SignerTester { io: io, miner: miner, _client: client, + _event_loop: event_loop, } } diff --git a/rpc/src/v1/traits/signer.rs b/rpc/src/v1/traits/signer.rs index 1b03f6d8a..0280f9612 100644 --- a/rpc/src/v1/traits/signer.rs +++ b/rpc/src/v1/traits/signer.rs @@ -16,6 +16,8 @@ //! Parity Signer-related rpc interface. use jsonrpc_core::Error; +use jsonrpc_pubsub::SubscriptionId; +use jsonrpc_macros::pubsub::Subscriber; use futures::BoxFuture; use v1::types::{U256, Bytes, TransactionModification, ConfirmationRequest, ConfirmationResponse, ConfirmationResponseWithToken}; @@ -23,6 +25,7 @@ use v1::types::{U256, Bytes, TransactionModification, ConfirmationRequest, Confi build_rpc_trait! { /// Signer extension for confirmations rpc interface. pub trait Signer { + type Metadata; /// Returns a list of items to confirm. #[rpc(name = "signer_requestsToConfirm")] @@ -51,5 +54,15 @@ build_rpc_trait! { /// Generates new web proxy access token. #[rpc(name = "signer_generateWebProxyAccessToken")] fn generate_web_proxy_token(&self) -> Result; + + #[pubsub(name = "signer_pending")] { + /// Subscribe to new pending requests on signer interface. + #[rpc(name = "signer_subscribePending")] + fn subscribe_pending(&self, Self::Metadata, Subscriber>); + + /// Unsubscribe from pending requests subscription. + #[rpc(name = "signer_unsubscribePending")] + fn unsubscribe_pending(&self, SubscriptionId) -> BoxFuture; + } } } diff --git a/signer/src/ws_server/mod.rs b/signer/src/ws_server/mod.rs index 314351938..7bff6cf19 100644 --- a/signer/src/ws_server/mod.rs +++ b/signer/src/ws_server/mod.rs @@ -127,7 +127,7 @@ impl ServerBuilder { /// `WebSockets` server implementation. pub struct Server { handle: Option>, - broadcaster_handle: Option>, + broadcaster: ws::Sender, queue: Arc, panic_handler: Arc, addr: SocketAddr, @@ -188,27 +188,10 @@ impl Server { }).unwrap(); }); - // Spawn a thread for broadcasting - let ph = panic_handler.clone(); - let q = queue.clone(); - let broadcaster_handle = thread::spawn(move || { - ph.catch_panic(move || { - q.start_listening(|_message| { - // TODO [ToDr] Some better structure here for messages. - broadcaster.send("new_message").unwrap(); - }).expect("It's the only place we are running start_listening. It shouldn't fail."); - let res = broadcaster.shutdown(); - - if let Err(e) = res { - warn!("Signer: Broadcaster was not closed cleanly. Details: {:?}", e); - } - }).unwrap() - }); - // Return a handle Ok(Server { handle: Some(handle), - broadcaster_handle: Some(broadcaster_handle), + broadcaster: broadcaster, queue: queue, panic_handler: panic_handler, addr: addr, @@ -225,7 +208,7 @@ impl MayPanic for Server { impl Drop for Server { fn drop(&mut self) { self.queue.finish(); - self.broadcaster_handle.take().unwrap().join().unwrap(); + self.broadcaster.shutdown().unwrap(); self.handle.take().unwrap().join().unwrap(); } }