Proper signer Pub-Sub for pending requests. (#5594)

* Signer subscription.

* Fixing RPC tests.

* Improve notification performance.
This commit is contained in:
Tomasz Drwięga 2017-05-17 16:20:41 +02:00 committed by Gav Wood
parent da8be072fa
commit 240704fb54
12 changed files with 187 additions and 127 deletions

View File

@ -195,11 +195,6 @@ export default class Ws extends JsonRpcBase {
}
_onMessage = (event) => {
// Event sent by Signer Broadcaster
if (event.data === 'new_message') {
return false;
}
try {
const result = JSON.parse(event.data);
const { method, params, json, resolve, reject } = this._messages[result.id];

View File

@ -258,7 +258,7 @@ impl FullDependencies {
handler.extend_with(PersonalClient::new(&self.secret_store, dispatcher.clone(), self.geth_compatibility).to_delegate());
},
Api::Signer => {
handler.extend_with(SignerClient::new(&self.secret_store, dispatcher.clone(), &self.signer_service).to_delegate());
handler.extend_with(SignerClient::new(&self.secret_store, dispatcher.clone(), &self.signer_service, self.remote.clone()).to_delegate());
},
Api::Parity => {
let signer = match self.signer_service.is_enabled() {
@ -352,6 +352,7 @@ pub struct LightDependencies {
pub dapps_port: Option<u16>,
pub fetch: FetchClient,
pub geth_compatibility: bool,
pub remote: parity_reactor::Remote,
}
impl Dependencies for LightDependencies {
@ -415,7 +416,7 @@ impl Dependencies for LightDependencies {
},
Api::Signer => {
let secret_store = Some(self.secret_store.clone());
handler.extend_with(SignerClient::new(&secret_store, dispatcher.clone(), &self.signer_service).to_delegate());
handler.extend_with(SignerClient::new(&secret_store, dispatcher.clone(), &self.signer_service, self.remote.clone()).to_delegate());
},
Api::Parity => {
let signer = match self.signer_service.is_enabled() {

View File

@ -292,6 +292,7 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) ->
},
fetch: fetch,
geth_compatibility: cmd.geth_compatibility,
remote: event_loop.remote(),
});
let dependencies = rpc::Dependencies {

View File

@ -33,6 +33,7 @@ mod poll_filter;
mod requests;
mod signer;
mod signing_queue;
mod subscribers;
mod subscription_manager;
pub use self::dispatch::{Dispatcher, FullDispatcher};
@ -47,4 +48,5 @@ pub use self::signing_queue::{
QUEUE_LIMIT as SIGNING_QUEUE_LIMIT,
};
pub use self::signer::SignerService;
pub use self::subscribers::Subscribers;
pub use self::subscription_manager::GenericPollManager;

View File

@ -16,7 +16,7 @@
use std::mem;
use std::cell::RefCell;
use std::sync::{mpsc, Arc};
use std::sync::Arc;
use std::collections::BTreeMap;
use jsonrpc_core;
use util::{Mutex, RwLock, U256, Address};
@ -27,7 +27,6 @@ use v1::types::{ConfirmationResponse, H160 as RpcH160, Origin, DappId as RpcDapp
/// Result that can be returned from JSON RPC.
pub type RpcResult = Result<ConfirmationResponse, jsonrpc_core::Error>;
/// Type of default account
pub enum DefaultAccount {
/// Default account is known
@ -49,7 +48,7 @@ impl From<RpcH160> for DefaultAccount {
}
/// Possible events happening in the queue that can be listened to.
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub enum QueueEvent {
/// Receiver should stop work upon receiving `Finish` message.
Finish,
@ -61,15 +60,6 @@ pub enum QueueEvent {
RequestConfirmed(U256),
}
/// Defines possible errors returned from queue receiving method.
#[derive(Debug, PartialEq)]
pub enum QueueError {
/// Returned when method has been already used (no receiver available).
AlreadyUsed,
/// Returned when receiver encounters an error.
ReceiverError(mpsc::RecvError),
}
/// Defines possible errors when inserting to queue
#[derive(Debug, PartialEq)]
pub enum QueueAddError {
@ -184,59 +174,31 @@ impl ConfirmationPromise {
/// Queue for all unconfirmed requests.
#[derive(Default)]
pub struct ConfirmationsQueue {
id: Mutex<U256>,
queue: RwLock<BTreeMap<U256, ConfirmationToken>>,
sender: Mutex<mpsc::Sender<QueueEvent>>,
receiver: Mutex<Option<mpsc::Receiver<QueueEvent>>>,
}
impl Default for ConfirmationsQueue {
fn default() -> Self {
let (send, recv) = mpsc::channel();
ConfirmationsQueue {
id: Mutex::new(U256::from(0)),
queue: RwLock::new(BTreeMap::new()),
sender: Mutex::new(send),
receiver: Mutex::new(Some(recv)),
}
}
on_event: RwLock<Vec<Box<Fn(QueueEvent) -> () + Send + Sync>>>,
}
impl ConfirmationsQueue {
/// Blocks the thread and starts listening for notifications regarding all actions in the queue.
/// For each event, `listener` callback will be invoked.
/// This method can be used only once (only single consumer of events can exist).
pub fn start_listening<F>(&self, listener: F) -> Result<(), QueueError>
where F: Fn(QueueEvent) -> () {
let recv = self.receiver.lock().take();
if let None = recv {
return Err(QueueError::AlreadyUsed);
}
let recv = recv.expect("Check for none is done earlier.");
loop {
let message = recv.recv().map_err(|e| QueueError::ReceiverError(e))?;
if let QueueEvent::Finish = message {
return Ok(());
}
listener(message);
}
/// Adds a queue listener. For each event, `listener` callback will be invoked.
pub fn on_event<F: Fn(QueueEvent) -> () + Send + Sync + 'static>(&self, listener: F) {
self.on_event.write().push(Box::new(listener));
}
/// Notifies consumer that the communcation is over.
/// No more events will be sent after this function is invoked.
pub fn finish(&self) {
self.notify(QueueEvent::Finish);
self.on_event.write().clear();
}
/// Notifies receiver about the event happening in this queue.
fn notify(&self, message: QueueEvent) {
// We don't really care about the result
let _ = self.sender.lock().send(message);
for listener in &*self.on_event.read() {
listener(message.clone())
}
}
/// Removes requests from this queue and notifies `ConfirmationPromise` holders about the result.
@ -384,26 +346,23 @@ mod test {
#[test]
fn should_receive_notification() {
// given
let received = Arc::new(Mutex::new(None));
let received = Arc::new(Mutex::new(vec![]));
let queue = Arc::new(ConfirmationsQueue::default());
let request = request();
// when
let q = queue.clone();
let r = received.clone();
let handle = thread::spawn(move || {
q.start_listening(move |notification| {
let mut v = r.lock();
*v = Some(notification);
}).expect("Should be closed nicely.")
queue.on_event(move |notification| {
r.lock().push(notification);
});
queue.add_request(request, Default::default()).unwrap();
queue.finish();
// then
handle.join().expect("Thread should finish nicely");
let r = received.lock().take();
assert_eq!(r, Some(QueueEvent::NewRequest(U256::from(1))));
let r = received.lock();
assert_eq!(r[0], QueueEvent::NewRequest(U256::from(1)));
assert_eq!(r[1], QueueEvent::Finish);
assert_eq!(r.len(), 2);
}
#[test]

View File

@ -0,0 +1,86 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! A map of subscribers.
use std::ops;
use std::collections::HashMap;
use jsonrpc_macros::pubsub::{Subscriber, Sink, SubscriptionId};
#[derive(Clone, Debug)]
pub struct Subscribers<T> {
next_id: u64,
subscriptions: HashMap<u64, T>,
}
impl<T> Default for Subscribers<T> {
fn default() -> Self {
Subscribers {
next_id: 0,
subscriptions: HashMap::new(),
}
}
}
impl<T> Subscribers<T> {
fn next_id(&mut self) -> u64 {
self.next_id += 1;
self.next_id
}
/// Insert new subscription and return assigned id.
pub fn insert(&mut self, val: T) -> SubscriptionId {
let id = self.next_id();
debug!(target: "pubsub", "Adding subscription id={}", id);
self.subscriptions.insert(id, val);
SubscriptionId::Number(id)
}
/// Removes subscription with given id and returns it (if any).
pub fn remove(&mut self, id: &SubscriptionId) -> Option<T> {
trace!(target: "pubsub", "Removing subscription id={:?}", id);
match *id {
SubscriptionId::Number(id) => {
self.subscriptions.remove(&id)
},
_ => None,
}
}
}
impl <T> Subscribers<Sink<T>> {
/// Assigns id and adds a subscriber to the list.
pub fn push(&mut self, sub: Subscriber<T>) {
let id = self.next_id();
match sub.assign_id(SubscriptionId::Number(id)) {
Ok(sink) => {
debug!(target: "pubsub", "Adding subscription id={:?}", id);
self.subscriptions.insert(id, sink);
},
Err(_) => {
self.next_id -= 1;
},
}
}
}
impl<T> ops::Deref for Subscribers<T> {
type Target = HashMap<u64, T>;
fn deref(&self) -> &Self::Target {
&self.subscriptions
}
}

View File

@ -17,14 +17,15 @@
//! Generic poll manager for Pub-Sub.
use std::sync::Arc;
use std::collections::HashMap;
use util::Mutex;
use jsonrpc_core::futures::future::{self, Either};
use jsonrpc_core::futures::sync::mpsc;
use jsonrpc_core::futures::{Sink, Future, BoxFuture};
use jsonrpc_core::{self as core, MetaIoHandler};
use jsonrpc_pubsub::SubscriptionId;
use v1::helpers::Subscribers;
use v1::metadata::Metadata;
#[derive(Debug)]
@ -40,8 +41,7 @@ struct Subscription {
/// TODO [ToDr] Depending on the method decide on poll interval.
/// For most of the methods it will be enough to poll on new block instead of time-interval.
pub struct GenericPollManager<S: core::Middleware<Metadata>> {
next_id: usize,
poll_subscriptions: HashMap<usize, Subscription>,
subscribers: Subscribers<Subscription>,
rpc: MetaIoHandler<Metadata, S>,
}
@ -49,21 +49,16 @@ impl<S: core::Middleware<Metadata>> GenericPollManager<S> {
/// Creates new poll manager
pub fn new(rpc: MetaIoHandler<Metadata, S>) -> Self {
GenericPollManager {
next_id: 1,
poll_subscriptions: Default::default(),
subscribers: Default::default(),
rpc: rpc,
}
}
/// Subscribes to update from polling given method.
pub fn subscribe(&mut self, metadata: Metadata, method: String, params: core::Params)
-> (usize, mpsc::Receiver<Result<core::Value, core::Error>>)
-> (SubscriptionId, mpsc::Receiver<Result<core::Value, core::Error>>)
{
let id = self.next_id;
self.next_id += 1;
let (sink, stream) = mpsc::channel(1);
let subscription = Subscription {
metadata: metadata,
method: method,
@ -71,21 +66,19 @@ impl<S: core::Middleware<Metadata>> GenericPollManager<S> {
sink: sink,
last_result: Default::default(),
};
debug!(target: "pubsub", "Adding subscription id={:?}, {:?}", id, subscription);
self.poll_subscriptions.insert(id, subscription);
let id = self.subscribers.insert(subscription);
(id, stream)
}
pub fn unsubscribe(&mut self, id: usize) -> bool {
pub fn unsubscribe(&mut self, id: &SubscriptionId) -> bool {
debug!(target: "pubsub", "Removing subscription: {:?}", id);
self.poll_subscriptions.remove(&id).is_some()
self.subscribers.remove(id).is_some()
}
pub fn tick(&self) -> BoxFuture<(), ()> {
let mut futures = Vec::new();
// poll all subscriptions
for (id, subscription) in self.poll_subscriptions.iter() {
for (id, subscription) in self.subscribers.iter() {
let call = core::MethodCall {
jsonrpc: Some(core::Version::V2),
id: core::Id::Num(*id as u64),
@ -130,6 +123,7 @@ mod tests {
use jsonrpc_core::{MetaIoHandler, NoopMiddleware, Value, Params};
use jsonrpc_core::futures::{Future, Stream};
use jsonrpc_pubsub::SubscriptionId;
use http::tokio_core::reactor;
use super::GenericPollManager;
@ -154,7 +148,7 @@ mod tests {
let mut el = reactor::Core::new().unwrap();
let mut poll_manager = poll_manager();
let (id, rx) = poll_manager.subscribe(Default::default(), "hello".into(), Params::None);
assert_eq!(id, 1);
assert_eq!(id, SubscriptionId::Number(1));
// then
poll_manager.tick().wait().unwrap();
@ -169,7 +163,7 @@ mod tests {
// and no more notifications
poll_manager.tick().wait().unwrap();
// we need to unsubscribe otherwise the future will never finish.
poll_manager.unsubscribe(1);
poll_manager.unsubscribe(&id);
assert_eq!(el.run(rx.into_future()).unwrap().0, None);
}
}

View File

@ -70,7 +70,7 @@ impl<S: core::Middleware<Metadata>> PubSub for PubSubClient<S> {
let mut poll_manager = self.poll_manager.write();
let (id, receiver) = poll_manager.subscribe(meta, method, params);
match subscriber.assign_id(SubscriptionId::Number(id as u64)) {
match subscriber.assign_id(id.clone()) {
Ok(sink) => {
self.remote.spawn(receiver.map(|res| match res {
Ok(val) => val,
@ -83,18 +83,13 @@ impl<S: core::Middleware<Metadata>> PubSub for PubSubClient<S> {
})).map(|_| ()));
},
Err(_) => {
poll_manager.unsubscribe(id);
poll_manager.unsubscribe(&id);
},
}
}
fn parity_unsubscribe(&self, id: SubscriptionId) -> BoxFuture<bool, Error> {
let res = if let SubscriptionId::Number(id) = id {
self.poll_manager.write().unsubscribe(id as usize)
} else {
false
};
let res = self.poll_manager.write().unsubscribe(&id);
futures::future::ok(res).boxed()
}
}

View File

@ -18,16 +18,21 @@
use std::sync::{Arc, Weak};
use rlp::UntrustedRlp;
use ethcore::account_provider::AccountProvider;
use ethcore::transaction::{SignedTransaction, PendingTransaction};
use ethkey;
use futures::{future, BoxFuture, Future, IntoFuture};
use parity_reactor::Remote;
use rlp::UntrustedRlp;
use util::Mutex;
use jsonrpc_core::Error;
use jsonrpc_core::{futures, Error};
use jsonrpc_pubsub::SubscriptionId;
use jsonrpc_macros::pubsub::{Sink, Subscriber};
use v1::helpers::accounts::unwrap_provider;
use v1::helpers::dispatch::{self, Dispatcher, WithToken, eth_data_hash};
use v1::helpers::{errors, SignerService, SigningQueue, ConfirmationPayload, FilledTransactionRequest};
use v1::helpers::{errors, SignerService, SigningQueue, ConfirmationPayload, FilledTransactionRequest, Subscribers};
use v1::metadata::Metadata;
use v1::traits::Signer;
use v1::types::{TransactionModification, ConfirmationRequest, ConfirmationResponse, ConfirmationResponseWithToken, U256, Bytes};
@ -35,21 +40,40 @@ use v1::types::{TransactionModification, ConfirmationRequest, ConfirmationRespon
pub struct SignerClient<D: Dispatcher> {
signer: Weak<SignerService>,
accounts: Option<Weak<AccountProvider>>,
dispatcher: D
dispatcher: D,
subscribers: Arc<Mutex<Subscribers<Sink<Vec<ConfirmationRequest>>>>>,
}
impl<D: Dispatcher + 'static> SignerClient<D> {
/// Create new instance of signer client.
pub fn new(
store: &Option<Arc<AccountProvider>>,
dispatcher: D,
signer: &Arc<SignerService>,
remote: Remote,
) -> Self {
let subscribers = Arc::new(Mutex::new(Subscribers::default()));
let subs = Arc::downgrade(&subscribers);
let s = Arc::downgrade(signer);
signer.queue().on_event(move |_event| {
if let (Some(s), Some(subs)) = (s.upgrade(), subs.upgrade()) {
let requests = s.requests().into_iter().map(Into::into).collect::<Vec<ConfirmationRequest>>();
for subscription in subs.lock().values() {
let subscription: &Sink<_> = subscription;
remote.spawn(subscription
.notify(requests.clone())
.map(|_| ())
.map_err(|e| warn!(target: "rpc", "Unable to send notification: {}", e))
);
}
}
});
SignerClient {
signer: Arc::downgrade(signer),
accounts: store.as_ref().map(Arc::downgrade),
dispatcher: dispatcher,
subscribers: subscribers,
}
}
@ -139,10 +163,10 @@ impl<D: Dispatcher + 'static> SignerClient<D> {
}
impl<D: Dispatcher + 'static> Signer for SignerClient<D> {
type Metadata = Metadata;
fn requests_to_confirm(&self) -> Result<Vec<ConfirmationRequest>, Error> {
let signer = take_weak!(self.signer);
Ok(signer.requests()
.into_iter()
.map(Into::into)
@ -214,23 +238,26 @@ impl<D: Dispatcher + 'static> Signer for SignerClient<D> {
}
fn reject_request(&self, id: U256) -> Result<bool, Error> {
let signer = take_weak!(self.signer);
let res = signer.request_rejected(id.into());
let res = take_weak!(self.signer).request_rejected(id.into());
Ok(res.is_some())
}
fn generate_token(&self) -> Result<String, Error> {
let signer = take_weak!(self.signer);
signer.generate_token()
take_weak!(self.signer).generate_token()
.map_err(|e| errors::token(e))
}
fn generate_web_proxy_token(&self) -> Result<String, Error> {
let signer = take_weak!(self.signer);
Ok(take_weak!(self.signer).generate_web_proxy_access_token())
}
Ok(signer.generate_web_proxy_access_token())
fn subscribe_pending(&self, _meta: Self::Metadata, sub: Subscriber<Vec<ConfirmationRequest>>) {
self.subscribers.lock().push(sub)
}
fn unsubscribe_pending(&self, id: SubscriptionId) -> BoxFuture<bool, Error> {
let res = self.subscribers.lock().remove(&id).is_some();
futures::future::ok(res).boxed()
}
}

View File

@ -21,6 +21,7 @@ use util::{U256, Uint, Address, ToPretty};
use ethcore::account_provider::AccountProvider;
use ethcore::client::TestBlockChainClient;
use ethcore::transaction::{Transaction, Action, SignedTransaction};
use parity_reactor::EventLoop;
use rlp::encode;
use serde_json;
@ -40,6 +41,7 @@ struct SignerTester {
// these unused fields are necessary to keep the data alive
// as the handler has only weak pointers.
_client: Arc<TestBlockChainClient>,
_event_loop: EventLoop,
}
fn blockchain_client() -> Arc<TestBlockChainClient> {
@ -61,10 +63,11 @@ fn signer_tester() -> SignerTester {
let opt_accounts = Some(accounts.clone());
let client = blockchain_client();
let miner = miner_service();
let event_loop = EventLoop::spawn();
let dispatcher = FullDispatcher::new(Arc::downgrade(&client), Arc::downgrade(&miner));
let mut io = IoHandler::default();
io.extend_with(SignerClient::new(&opt_accounts, dispatcher, &signer).to_delegate());
io.extend_with(SignerClient::new(&opt_accounts, dispatcher, &signer, event_loop.remote()).to_delegate());
SignerTester {
signer: signer,
@ -72,6 +75,7 @@ fn signer_tester() -> SignerTester {
io: io,
miner: miner,
_client: client,
_event_loop: event_loop,
}
}

View File

@ -16,6 +16,8 @@
//! Parity Signer-related rpc interface.
use jsonrpc_core::Error;
use jsonrpc_pubsub::SubscriptionId;
use jsonrpc_macros::pubsub::Subscriber;
use futures::BoxFuture;
use v1::types::{U256, Bytes, TransactionModification, ConfirmationRequest, ConfirmationResponse, ConfirmationResponseWithToken};
@ -23,6 +25,7 @@ use v1::types::{U256, Bytes, TransactionModification, ConfirmationRequest, Confi
build_rpc_trait! {
/// Signer extension for confirmations rpc interface.
pub trait Signer {
type Metadata;
/// Returns a list of items to confirm.
#[rpc(name = "signer_requestsToConfirm")]
@ -51,5 +54,15 @@ build_rpc_trait! {
/// Generates new web proxy access token.
#[rpc(name = "signer_generateWebProxyAccessToken")]
fn generate_web_proxy_token(&self) -> Result<String, Error>;
#[pubsub(name = "signer_pending")] {
/// Subscribe to new pending requests on signer interface.
#[rpc(name = "signer_subscribePending")]
fn subscribe_pending(&self, Self::Metadata, Subscriber<Vec<ConfirmationRequest>>);
/// Unsubscribe from pending requests subscription.
#[rpc(name = "signer_unsubscribePending")]
fn unsubscribe_pending(&self, SubscriptionId) -> BoxFuture<bool, Error>;
}
}
}

View File

@ -127,7 +127,7 @@ impl ServerBuilder {
/// `WebSockets` server implementation.
pub struct Server {
handle: Option<thread::JoinHandle<()>>,
broadcaster_handle: Option<thread::JoinHandle<()>>,
broadcaster: ws::Sender,
queue: Arc<ConfirmationsQueue>,
panic_handler: Arc<PanicHandler>,
addr: SocketAddr,
@ -188,27 +188,10 @@ impl Server {
}).unwrap();
});
// Spawn a thread for broadcasting
let ph = panic_handler.clone();
let q = queue.clone();
let broadcaster_handle = thread::spawn(move || {
ph.catch_panic(move || {
q.start_listening(|_message| {
// TODO [ToDr] Some better structure here for messages.
broadcaster.send("new_message").unwrap();
}).expect("It's the only place we are running start_listening. It shouldn't fail.");
let res = broadcaster.shutdown();
if let Err(e) = res {
warn!("Signer: Broadcaster was not closed cleanly. Details: {:?}", e);
}
}).unwrap()
});
// Return a handle
Ok(Server {
handle: Some(handle),
broadcaster_handle: Some(broadcaster_handle),
broadcaster: broadcaster,
queue: queue,
panic_handler: panic_handler,
addr: addr,
@ -225,7 +208,7 @@ impl MayPanic for Server {
impl Drop for Server {
fn drop(&mut self) {
self.queue.finish();
self.broadcaster_handle.take().unwrap().join().unwrap();
self.broadcaster.shutdown().unwrap();
self.handle.take().unwrap().join().unwrap();
}
}