From 86c263328060489648ad2aad5585ec24c00e85c6 Mon Sep 17 00:00:00 2001 From: Kirill Pimenov Date: Tue, 17 Oct 2017 14:50:53 +0200 Subject: [PATCH] Migrate to Futures in SigningQueue (#6689) * oneshot channels instead of custom promises * Future instead of handle_dispatch * Even less copying * Those explicit waits were a mistake, thanks, @tomusdrw * No more unsafe polling * Test for the new `is_done()` method * Mark Futures as `#[must_use]` * Solve most compilation warnings * `try_ready!` is more ideomatic * Turn spaces into tabs * Documentation and visibility improvements * Minor code style improvements * Make Futures run on an explisit reactor * Another round of code style issues * Simplify ConfirmationReceiver type * Flatten ConfirmationOutcome into a plain Result type * Get rid of a separate `pending` set, it was a stupid idea * Clarify `add_request` docs * No need to reduce the scope of the mutex here --- Cargo.lock | 1 + parity/rpc_apis.rs | 4 +- rpc/Cargo.toml | 1 + rpc/src/lib.rs | 3 + rpc/src/v1/helpers/mod.rs | 3 +- rpc/src/v1/helpers/signing_queue.rs | 171 +++++------------- rpc/src/v1/impls/signing.rs | 200 +++++++++------------- rpc/src/v1/tests/helpers/sync_provider.rs | 8 +- rpc/src/v1/tests/mocked/signer.rs | 26 +-- rpc/src/v1/tests/mocked/signing.rs | 12 +- rpc/src/v1/types/confirmations.rs | 8 +- rpc/src/v1/types/transaction_request.rs | 2 +- 12 files changed, 173 insertions(+), 266 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8930a592f..c9e1d7683 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2137,6 +2137,7 @@ dependencies = [ "ethstore 0.1.0", "ethsync 1.9.0", "fetch 0.1.0", + "futures 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)", "futures-cpupool 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "hardware-wallet 1.9.0", "hash 0.1.0", diff --git a/parity/rpc_apis.rs b/parity/rpc_apis.rs index d348202a9..6064d3d2f 100644 --- a/parity/rpc_apis.rs +++ b/parity/rpc_apis.rs @@ -244,7 +244,7 @@ impl FullDependencies { let deps = &$deps; let dispatcher = FullDispatcher::new(deps.client.clone(), deps.miner.clone()); if deps.signer_service.is_enabled() { - $handler.extend_with($namespace::to_delegate(SigningQueueClient::new(&deps.signer_service, dispatcher, &deps.secret_store))) + $handler.extend_with($namespace::to_delegate(SigningQueueClient::new(&deps.signer_service, dispatcher, deps.remote.clone(), &deps.secret_store))) } else { $handler.extend_with($namespace::to_delegate(SigningUnsafeClient::new(&deps.secret_store, dispatcher))) } @@ -445,7 +445,7 @@ impl LightDependencies { let secret_store = Some(deps.secret_store.clone()); if deps.signer_service.is_enabled() { $handler.extend_with($namespace::to_delegate( - SigningQueueClient::new(&deps.signer_service, dispatcher, &secret_store) + SigningQueueClient::new(&deps.signer_service, dispatcher, deps.remote.clone(), &secret_store) )) } else { $handler.extend_with( diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 7ad76a284..dc70406c7 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -10,6 +10,7 @@ authors = ["Parity Technologies "] [dependencies] ansi_term = "0.9" cid = "0.2" +futures = "0.1.6" futures-cpupool = "0.1" log = "0.3" multihash ="0.6" diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 9636f85cc..ab0b9082d 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -20,6 +20,9 @@ #![cfg_attr(feature="dev", feature(plugin))] #![cfg_attr(feature="dev", plugin(clippy))] +#[macro_use] +extern crate futures; + extern crate ansi_term; extern crate cid; extern crate crypto as rust_crypto; diff --git a/rpc/src/v1/helpers/mod.rs b/rpc/src/v1/helpers/mod.rs index f330c75eb..3e065c5ef 100644 --- a/rpc/src/v1/helpers/mod.rs +++ b/rpc/src/v1/helpers/mod.rs @@ -44,7 +44,8 @@ pub use self::requests::{ TransactionRequest, FilledTransactionRequest, ConfirmationRequest, ConfirmationPayload, CallRequest, }; pub use self::signing_queue::{ - ConfirmationsQueue, ConfirmationPromise, ConfirmationResult, SigningQueue, QueueEvent, DefaultAccount, + ConfirmationsQueue, ConfirmationReceiver, ConfirmationResult, + SigningQueue, QueueEvent, DefaultAccount, QUEUE_LIMIT as SIGNING_QUEUE_LIMIT, }; pub use self::signer::SignerService; diff --git a/rpc/src/v1/helpers/signing_queue.rs b/rpc/src/v1/helpers/signing_queue.rs index dfe198e4a..e61f192ea 100644 --- a/rpc/src/v1/helpers/signing_queue.rs +++ b/rpc/src/v1/helpers/signing_queue.rs @@ -14,20 +14,18 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use std::mem; -use std::cell::RefCell; -use std::sync::Arc; use std::collections::BTreeMap; -use jsonrpc_core; use bigint::prelude::U256; use util::Address; use parking_lot::{Mutex, RwLock}; use ethcore::account_provider::DappId; -use v1::helpers::{ConfirmationRequest, ConfirmationPayload}; +use v1::helpers::{ConfirmationRequest, ConfirmationPayload, oneshot, errors}; use v1::types::{ConfirmationResponse, H160 as RpcH160, Origin, DappId as RpcDappId}; +use jsonrpc_core::Error; + /// Result that can be returned from JSON RPC. -pub type RpcResult = Result; +pub type ConfirmationResult = Result; /// Type of default account pub enum DefaultAccount { @@ -74,8 +72,9 @@ pub const QUEUE_LIMIT: usize = 50; /// A queue of transactions awaiting to be confirmed and signed. pub trait SigningQueue: Send + Sync { /// Add new request to the queue. - /// Returns a `ConfirmationPromise` that can be used to await for resolution of given request. - fn add_request(&self, request: ConfirmationPayload, origin: Origin) -> Result; + /// Returns a `Result` wrapping `ConfirmationReceiver` together with it's unique id in the queue. + /// `ConfirmationReceiver` is a `Future` awaiting for resolution of the given request. + fn add_request(&self, request: ConfirmationPayload, origin: Origin) -> Result<(U256, ConfirmationReceiver), QueueAddError>; /// Removes a request from the queue. /// Notifies possible token holders that request was rejected. @@ -83,7 +82,7 @@ pub trait SigningQueue: Send + Sync { /// Removes a request from the queue. /// Notifies possible token holders that request was confirmed and given hash was assigned. - fn request_confirmed(&self, id: U256, result: RpcResult) -> Option; + fn request_confirmed(&self, id: U256, result: ConfirmationResult) -> Option; /// Returns a request if it is contained in the queue. fn peek(&self, id: &U256) -> Option; @@ -98,88 +97,20 @@ pub trait SigningQueue: Send + Sync { fn is_empty(&self) -> bool; } -#[derive(Debug, Clone, PartialEq)] -/// Result of a pending confirmation request. -pub enum ConfirmationResult { - /// The request has not yet been confirmed nor rejected. - Waiting, - /// The request has been rejected. - Rejected, - /// The request has been confirmed. - Confirmed(RpcResult), -} - -type Listener = Box) + Send>; - -/// A handle to submitted request. -/// Allows to block and wait for a resolution of that request. -pub struct ConfirmationToken { - result: Arc>, - listeners: Arc>>, +struct ConfirmationSender { + sender: oneshot::Sender, request: ConfirmationRequest, } -pub struct ConfirmationPromise { - id: U256, - result: Arc>, - listeners: Arc>>, -} - -impl ConfirmationToken { - /// Submit solution to all listeners - fn resolve(&self, result: Option) { - let wrapped = result.clone().map_or(ConfirmationResult::Rejected, |h| ConfirmationResult::Confirmed(h)); - { - let mut res = self.result.lock(); - *res = wrapped.clone(); - } - // Notify listener - let listeners = { - let mut listeners = self.listeners.lock(); - mem::replace(&mut *listeners, Vec::new()) - }; - for mut listener in listeners { - listener(result.clone()); - } - } - - fn as_promise(&self) -> ConfirmationPromise { - ConfirmationPromise { - id: self.request.id, - result: self.result.clone(), - listeners: self.listeners.clone(), - } - } -} - -impl ConfirmationPromise { - /// Get the ID for this request. - pub fn id(&self) -> U256 { self.id } - - /// Just get the result, assuming it exists. - pub fn result(&self) -> ConfirmationResult { - self.result.lock().clone() - } - - pub fn wait_for_result(self, callback: F) where F: FnOnce(Option) + Send + 'static { - trace!(target: "own_tx", "Signer: Awaiting confirmation... ({:?}).", self.id); - let _result = self.result.lock(); - let mut listeners = self.listeners.lock(); - // TODO [todr] Overcoming FnBox unstability - let callback = RefCell::new(Some(callback)); - listeners.push(Box::new(move |result| { - let ref mut f = *callback.borrow_mut(); - f.take().expect("Callbacks are called only once.")(result) - })); - } -} - +/// Receiving end of the Confirmation channel; can be used as a `Future` to await for `ConfirmationRequest` +/// being processed and turned into `ConfirmationOutcome` +pub type ConfirmationReceiver = oneshot::Receiver; /// Queue for all unconfirmed requests. #[derive(Default)] pub struct ConfirmationsQueue { id: Mutex, - queue: RwLock>, + queue: RwLock>, on_event: RwLock () + Send + Sync>>>, } @@ -203,23 +134,26 @@ impl ConfirmationsQueue { } } - /// Removes requests from this queue and notifies `ConfirmationPromise` holders about the result. + /// Removes requests from this queue and notifies `ConfirmationReceiver` holder about the result. /// Notifies also a receiver about that event. - fn remove(&self, id: U256, result: Option) -> Option { - let token = self.queue.write().remove(&id); + fn remove(&self, id: U256, result: Option) -> Option { + let sender = self.queue.write().remove(&id); - if let Some(token) = token { + if let Some(sender) = sender { // notify receiver about the event self.notify(result.clone().map_or_else( || QueueEvent::RequestRejected(id), |_| QueueEvent::RequestConfirmed(id) )); - // notify token holders about resolution - token.resolve(result); - // return a result - return Some(token.request.clone()); + + // notify confirmation receiver about resolution + let result = result.ok_or(errors::request_rejected()); + sender.sender.send(result); + + Some(sender.request) + } else { + None } - None } } @@ -230,7 +164,7 @@ impl Drop for ConfirmationsQueue { } impl SigningQueue for ConfirmationsQueue { - fn add_request(&self, request: ConfirmationPayload, origin: Origin) -> Result { + fn add_request(&self, request: ConfirmationPayload, origin: Origin) -> Result<(U256, ConfirmationReceiver), QueueAddError> { if self.len() > QUEUE_LIMIT { return Err(QueueAddError::LimitReached); } @@ -247,16 +181,17 @@ impl SigningQueue for ConfirmationsQueue { trace!(target: "own_tx", "Signer: ({:?}) : {:?}", id, request); let mut queue = self.queue.write(); - queue.insert(id, ConfirmationToken { - result: Arc::new(Mutex::new(ConfirmationResult::Waiting)), - listeners: Default::default(), + let (sender, receiver) = oneshot::oneshot::(); + + queue.insert(id, ConfirmationSender { + sender, request: ConfirmationRequest { - id: id, + id, payload: request, - origin: origin, + origin, }, }); - queue.get(&id).map(|token| token.as_promise()).expect("Token was just inserted.") + (id, receiver) }; // Notify listeners self.notify(QueueEvent::NewRequest(id)); @@ -264,7 +199,7 @@ impl SigningQueue for ConfirmationsQueue { } fn peek(&self, id: &U256) -> Option { - self.queue.read().get(id).map(|token| token.request.clone()) + self.queue.read().get(id).map(|sender| sender.request.clone()) } fn request_rejected(&self, id: U256) -> Option { @@ -272,14 +207,14 @@ impl SigningQueue for ConfirmationsQueue { self.remove(id, None) } - fn request_confirmed(&self, id: U256, result: RpcResult) -> Option { + fn request_confirmed(&self, id: U256, result: ConfirmationResult) -> Option { debug!(target: "own_tx", "Signer: Transaction confirmed ({:?}).", id); self.remove(id, Some(result)) } fn requests(&self) -> Vec { let queue = self.queue.read(); - queue.values().map(|token| token.request.clone()).collect() + queue.values().map(|sender| sender.request.clone()).collect() } fn len(&self) -> usize { @@ -296,13 +231,14 @@ impl SigningQueue for ConfirmationsQueue { #[cfg(test)] mod test { - use std::time::Duration; - use std::thread; - use std::sync::{mpsc, Arc}; + use std::sync::Arc; use bigint::prelude::U256; use util::Address; use parking_lot::Mutex; - use v1::helpers::{SigningQueue, ConfirmationsQueue, QueueEvent, FilledTransactionRequest, ConfirmationPayload}; + use jsonrpc_core::futures::Future; + use v1::helpers::{ + SigningQueue, ConfirmationsQueue, QueueEvent, FilledTransactionRequest, ConfirmationPayload, + }; use v1::types::ConfirmationResponse; fn request() -> ConfirmationPayload { @@ -326,25 +262,12 @@ mod test { let request = request(); // when - let q = queue.clone(); - let handle = thread::spawn(move || { - let v = q.add_request(request, Default::default()).unwrap(); - let (tx, rx) = mpsc::channel(); - v.wait_for_result(move |res| { - tx.send(res).unwrap(); - }); - rx.recv().unwrap().expect("Should return hash") - }); - - let id = U256::from(1); - while queue.peek(&id).is_none() { - // Just wait for the other thread to start - thread::sleep(Duration::from_millis(100)); - } + let (id, future) = queue.add_request(request, Default::default()).unwrap(); queue.request_confirmed(id, Ok(ConfirmationResponse::SendTransaction(1.into()))); // then - assert_eq!(handle.join().expect("Thread should finish nicely"), Ok(ConfirmationResponse::SendTransaction(1.into()))); + let confirmation = future.wait().unwrap(); + assert_eq!(confirmation, Ok(ConfirmationResponse::SendTransaction(1.into()))); } #[test] @@ -359,7 +282,7 @@ mod test { queue.on_event(move |notification| { r.lock().push(notification); }); - queue.add_request(request, Default::default()).unwrap(); + let _future = queue.add_request(request, Default::default()).unwrap(); queue.finish(); // then @@ -376,7 +299,7 @@ mod test { let request = request(); // when - queue.add_request(request.clone(), Default::default()).unwrap(); + let _future = queue.add_request(request.clone(), Default::default()).unwrap(); let all = queue.requests(); // then diff --git a/rpc/src/v1/impls/signing.rs b/rpc/src/v1/impls/signing.rs index 0f1217069..c6a2ce39b 100644 --- a/rpc/src/v1/impls/signing.rs +++ b/rpc/src/v1/impls/signing.rs @@ -23,13 +23,13 @@ use parking_lot::Mutex; use ethcore::account_provider::AccountProvider; -use jsonrpc_core::{BoxFuture, Error}; -use jsonrpc_core::futures::{future, Future}; +use jsonrpc_core::Error; +use jsonrpc_core::futures::{future, BoxFuture, Future, Poll, Async}; use jsonrpc_core::futures::future::Either; use v1::helpers::{ - errors, oneshot, - DefaultAccount, - SIGNING_QUEUE_LIMIT, SigningQueue, ConfirmationPromise, ConfirmationResult, SignerService, + errors, DefaultAccount, SignerService, SigningQueue, + ConfirmationReceiver as RpcConfirmationReceiver, + ConfirmationResult as RpcConfirmationResult, }; use v1::helpers::dispatch::{self, Dispatcher}; use v1::helpers::accounts::unwrap_provider; @@ -45,61 +45,67 @@ use v1::types::{ Origin, }; +use parity_reactor::Remote; + /// After 60s entries that are not queried with `check_request` will get garbage collected. const MAX_PENDING_DURATION_SEC: u32 = 60; -/// Max number of total requests pending and completed, before we start garbage collecting them. -const MAX_TOTAL_REQUESTS: usize = SIGNING_QUEUE_LIMIT; +#[must_use = "futures do nothing unless polled"] enum DispatchResult { - Promise(ConfirmationPromise), + Future(U256, RpcConfirmationReceiver), Value(RpcConfirmationResponse), } +impl Future for DispatchResult { + type Item = RpcConfirmationResponse; + type Error = Error; + + fn poll(&mut self) -> Poll { + match *self { + DispatchResult::Value(ref response) => Ok(Async::Ready(response.clone())), + DispatchResult::Future(_uid, ref mut future) => try_ready!(future.poll()).map(Async::Ready), + } + } +} + +fn schedule(remote: Remote, + confirmations: Arc>>>, + id: U256, + future: RpcConfirmationReceiver) { + { + let mut confirmations = confirmations.lock(); + confirmations.insert(id.clone(), None); + } + + let future = future.then(move |result| { + let mut confirmations = confirmations.lock(); + confirmations.prune(); + let result = result.and_then(|response| response); + confirmations.insert(id, Some(result)); + Ok(()) + }); + remote.spawn(future); +} + /// Implementation of functions that require signing when no trusted signer is used. pub struct SigningQueueClient { signer: Arc, accounts: Option>, dispatcher: D, - pending: Arc>>, -} - -fn handle_dispatch(res: Result, on_response: OnResponse) - where OnResponse: FnOnce(Result) + Send + 'static -{ - match res { - Ok(DispatchResult::Value(result)) => on_response(Ok(result)), - Ok(DispatchResult::Promise(promise)) => { - promise.wait_for_result(move |result| { - on_response(result.unwrap_or_else(|| Err(errors::request_rejected()))) - }) - }, - Err(e) => on_response(Err(e)), - } -} - -fn collect_garbage(map: &mut TransientHashMap) { - map.prune(); - if map.len() > MAX_TOTAL_REQUESTS { - // Remove all non-waiting entries. - let non_waiting: Vec<_> = map - .iter() - .filter(|&(_, val)| val.result() != ConfirmationResult::Waiting) - .map(|(key, _)| *key) - .collect(); - for k in non_waiting { - map.remove(&k); - } - } + remote: Remote, + // None here means that the request hasn't yet been confirmed + confirmations: Arc>>>, } impl SigningQueueClient { /// Creates a new signing queue client given shared signing queue. - pub fn new(signer: &Arc, dispatcher: D, accounts: &Option>) -> Self { + pub fn new(signer: &Arc, dispatcher: D, remote: Remote, accounts: &Option>) -> Self { SigningQueueClient { signer: signer.clone(), accounts: accounts.clone(), - dispatcher: dispatcher, - pending: Arc::new(Mutex::new(TransientHashMap::new(MAX_PENDING_DURATION_SEC))), + dispatcher, + remote, + confirmations: Arc::new(Mutex::new(TransientHashMap::new(MAX_PENDING_DURATION_SEC))), } } @@ -126,7 +132,7 @@ impl SigningQueueClient { } else { Either::B(future::done( signer.add_request(payload, origin) - .map(DispatchResult::Promise) + .map(|(id, future)| DispatchResult::Future(id, future)) .map_err(|_| errors::request_rejected_limit()) )) } @@ -144,35 +150,31 @@ impl ParitySigning for SigningQueueClient { } fn post_sign(&self, meta: Metadata, address: RpcH160, data: RpcBytes) -> BoxFuture, Error> { - let pending = self.pending.clone(); + let remote = self.remote.clone(); + let confirmations = self.confirmations.clone(); + Box::new(self.dispatch( RpcConfirmationPayload::EthSignMessage((address.clone(), data).into()), DefaultAccount::Provided(address.into()), meta.origin ).map(move |result| match result { DispatchResult::Value(v) => RpcEither::Or(v), - DispatchResult::Promise(promise) => { - let id = promise.id(); - let mut pending = pending.lock(); - collect_garbage(&mut pending); - pending.insert(id, promise); - + DispatchResult::Future(id, future) => { + schedule(remote, confirmations, id, future); RpcEither::Either(id.into()) }, })) } fn post_transaction(&self, meta: Metadata, request: RpcTransactionRequest) -> BoxFuture, Error> { - let pending = self.pending.clone(); - Box::new(self.dispatch(RpcConfirmationPayload::SendTransaction(request), meta.dapp_id().into(), meta.origin) - .map(move |result| match result { - DispatchResult::Value(v) => RpcEither::Or(v), - DispatchResult::Promise(promise) => { - let id = promise.id(); - let mut pending = pending.lock(); - collect_garbage(&mut pending); - pending.insert(id, promise); + let remote = self.remote.clone(); + let confirmations = self.confirmations.clone(); + Box::new(self.dispatch(RpcConfirmationPayload::SendTransaction(request), meta.dapp_id().into(), meta.origin) + .map(|result| match result { + DispatchResult::Value(v) => RpcEither::Or(v), + DispatchResult::Future(id, future) => { + schedule(remote, confirmations, id, future); RpcEither::Either(id.into()) }, })) @@ -180,13 +182,10 @@ impl ParitySigning for SigningQueueClient { fn check_request(&self, id: RpcU256) -> Result, Error> { let id: U256 = id.into(); - match self.pending.lock().get(&id) { - Some(ref promise) => match promise.result() { - ConfirmationResult::Waiting => Ok(None), - ConfirmationResult::Rejected => Err(errors::request_rejected()), - ConfirmationResult::Confirmed(rpc_response) => rpc_response.map(Some), - }, - _ => Err(errors::request_not_found()), + match self.confirmations.lock().get(&id) { + None => Err(errors::request_not_found()), // Request info has been dropped, or even never been there + Some(&None) => Ok(None), // No confirmation yet, request is known, confirmation is pending + Some(&Some(ref confirmation)) => confirmation.clone().map(Some), // Confirmation is there } } @@ -197,20 +196,12 @@ impl ParitySigning for SigningQueueClient { meta.origin, ); - let (ready, p) = oneshot::oneshot(); - - // when dispatch is complete - Box::new(res.then(move |res| { - // register callback via the oneshot sender. - handle_dispatch(res, move |response| { - match response { - Ok(RpcConfirmationResponse::Decrypt(data)) => ready.send(Ok(data)), - Err(e) => ready.send(Err(e)), - e => ready.send(Err(errors::internal("Unexpected result.", e))), - } - }); - - p + // when dispatch is complete - wait for result and then + Box::new(res.flatten().and_then(move |response| { + match response { + RpcConfirmationResponse::Decrypt(data) => Ok(data), + e => Err(errors::internal("Unexpected result.", e)), + } })) } } @@ -225,18 +216,11 @@ impl EthSigning for SigningQueueClient { meta.origin, ); - let (ready, p) = oneshot::oneshot(); - - Box::new(res.then(move |res| { - handle_dispatch(res, move |response| { - match response { - Ok(RpcConfirmationResponse::Signature(sig)) => ready.send(Ok(sig)), - Err(e) => ready.send(Err(e)), - e => ready.send(Err(errors::internal("Unexpected result.", e))), - } - }); - - p + Box::new(res.flatten().and_then(move |response| { + match response { + RpcConfirmationResponse::Signature(sig) => Ok(sig), + e => Err(errors::internal("Unexpected result.", e)), + } })) } @@ -247,18 +231,11 @@ impl EthSigning for SigningQueueClient { meta.origin, ); - let (ready, p) = oneshot::oneshot(); - - Box::new(res.then(move |res| { - handle_dispatch(res, move |response| { - match response { - Ok(RpcConfirmationResponse::SendTransaction(hash)) => ready.send(Ok(hash)), - Err(e) => ready.send(Err(e)), - e => ready.send(Err(errors::internal("Unexpected result.", e))), - } - }); - - p + Box::new(res.flatten().and_then(move |response| { + match response { + RpcConfirmationResponse::SendTransaction(hash) => Ok(hash), + e => Err(errors::internal("Unexpected result.", e)), + } })) } @@ -269,18 +246,11 @@ impl EthSigning for SigningQueueClient { meta.origin, ); - let (ready, p) = oneshot::oneshot(); - - Box::new(res.then(move |res| { - handle_dispatch(res, move |response| { - match response { - Ok(RpcConfirmationResponse::SignTransaction(tx)) => ready.send(Ok(tx)), - Err(e) => ready.send(Err(e)), - e => ready.send(Err(errors::internal("Unexpected result.", e))), - } - }); - - p + Box::new(res.flatten().and_then(move |response| { + match response { + RpcConfirmationResponse::SignTransaction(tx) => Ok(tx), + e => Err(errors::internal("Unexpected result.", e)), + } })) } } diff --git a/rpc/src/v1/tests/helpers/sync_provider.rs b/rpc/src/v1/tests/helpers/sync_provider.rs index 97bfb9eec..88962f09c 100644 --- a/rpc/src/v1/tests/helpers/sync_provider.rs +++ b/rpc/src/v1/tests/helpers/sync_provider.rs @@ -75,9 +75,9 @@ impl SyncProvider for TestSyncProvider { vec![ PeerInfo { id: Some("node1".to_owned()), - client_version: "Parity/1".to_owned(), + client_version: "Parity/1".to_owned(), capabilities: vec!["eth/62".to_owned(), "eth/63".to_owned()], - remote_address: "127.0.0.1:7777".to_owned(), + remote_address: "127.0.0.1:7777".to_owned(), local_address: "127.0.0.1:8888".to_owned(), eth_info: Some(EthProtocolInfo { version: 62, @@ -88,9 +88,9 @@ impl SyncProvider for TestSyncProvider { }, PeerInfo { id: None, - client_version: "Parity/2".to_owned(), + client_version: "Parity/2".to_owned(), capabilities: vec!["eth/63".to_owned(), "eth/64".to_owned()], - remote_address: "Handshake".to_owned(), + remote_address: "Handshake".to_owned(), local_address: "127.0.0.1:3333".to_owned(), eth_info: Some(EthProtocolInfo { version: 64, diff --git a/rpc/src/v1/tests/mocked/signer.rs b/rpc/src/v1/tests/mocked/signer.rs index 95095cb72..0211b5f24 100644 --- a/rpc/src/v1/tests/mocked/signer.rs +++ b/rpc/src/v1/tests/mocked/signer.rs @@ -80,7 +80,7 @@ fn signer_tester() -> SignerTester { fn should_return_list_of_items_to_confirm() { // given let tester = signer_tester(); - tester.signer.add_request(ConfirmationPayload::SendTransaction(FilledTransactionRequest { + let _send_future = tester.signer.add_request(ConfirmationPayload::SendTransaction(FilledTransactionRequest { from: Address::from(1), used_default_from: false, to: Some(Address::from_str("d46e8dd67c5d32be8058bb8eb970870f07244567").unwrap()), @@ -91,7 +91,7 @@ fn should_return_list_of_items_to_confirm() { nonce: None, condition: None, }), Origin::Dapps("http://parity.io".into())).unwrap(); - tester.signer.add_request(ConfirmationPayload::EthSignMessage(1.into(), vec![5].into()), Origin::Unknown).unwrap(); + let _sign_future = tester.signer.add_request(ConfirmationPayload::EthSignMessage(1.into(), vec![5].into()), Origin::Unknown).unwrap(); // when let request = r#"{"jsonrpc":"2.0","method":"signer_requestsToConfirm","params":[],"id":1}"#; @@ -111,7 +111,7 @@ fn should_return_list_of_items_to_confirm() { fn should_reject_transaction_from_queue_without_dispatching() { // given let tester = signer_tester(); - tester.signer.add_request(ConfirmationPayload::SendTransaction(FilledTransactionRequest { + let _confirmation_future = tester.signer.add_request(ConfirmationPayload::SendTransaction(FilledTransactionRequest { from: Address::from(1), used_default_from: false, to: Some(Address::from_str("d46e8dd67c5d32be8058bb8eb970870f07244567").unwrap()), @@ -138,7 +138,7 @@ fn should_reject_transaction_from_queue_without_dispatching() { fn should_not_remove_transaction_if_password_is_invalid() { // given let tester = signer_tester(); - tester.signer.add_request(ConfirmationPayload::SendTransaction(FilledTransactionRequest { + let _confirmation_future = tester.signer.add_request(ConfirmationPayload::SendTransaction(FilledTransactionRequest { from: Address::from(1), used_default_from: false, to: Some(Address::from_str("d46e8dd67c5d32be8058bb8eb970870f07244567").unwrap()), @@ -164,7 +164,7 @@ fn should_not_remove_transaction_if_password_is_invalid() { fn should_not_remove_sign_if_password_is_invalid() { // given let tester = signer_tester(); - tester.signer.add_request(ConfirmationPayload::EthSignMessage(0.into(), vec![5].into()), Origin::Unknown).unwrap(); + let _confirmation_future = tester.signer.add_request(ConfirmationPayload::EthSignMessage(0.into(), vec![5].into()), Origin::Unknown).unwrap(); assert_eq!(tester.signer.requests().len(), 1); // when @@ -182,7 +182,7 @@ fn should_confirm_transaction_and_dispatch() { let tester = signer_tester(); let address = tester.accounts.new_account("test").unwrap(); let recipient = Address::from_str("d46e8dd67c5d32be8058bb8eb970870f07244567").unwrap(); - tester.signer.add_request(ConfirmationPayload::SendTransaction(FilledTransactionRequest { + let _confirmation_future = tester.signer.add_request(ConfirmationPayload::SendTransaction(FilledTransactionRequest { from: address, used_default_from: false, to: Some(recipient), @@ -228,7 +228,7 @@ fn should_alter_the_sender_and_nonce() { //// given let tester = signer_tester(); let recipient = Address::from_str("d46e8dd67c5d32be8058bb8eb970870f07244567").unwrap(); - tester.signer.add_request(ConfirmationPayload::SendTransaction(FilledTransactionRequest { + let _confirmation_future = tester.signer.add_request(ConfirmationPayload::SendTransaction(FilledTransactionRequest { from: 0.into(), used_default_from: false, to: Some(recipient), @@ -278,7 +278,7 @@ fn should_confirm_transaction_with_token() { let tester = signer_tester(); let address = tester.accounts.new_account("test").unwrap(); let recipient = Address::from_str("d46e8dd67c5d32be8058bb8eb970870f07244567").unwrap(); - tester.signer.add_request(ConfirmationPayload::SendTransaction(FilledTransactionRequest { + let _confirmation_future = tester.signer.add_request(ConfirmationPayload::SendTransaction(FilledTransactionRequest { from: address, used_default_from: false, to: Some(recipient), @@ -327,7 +327,7 @@ fn should_confirm_transaction_with_rlp() { let tester = signer_tester(); let address = tester.accounts.new_account("test").unwrap(); let recipient = Address::from_str("d46e8dd67c5d32be8058bb8eb970870f07244567").unwrap(); - tester.signer.add_request(ConfirmationPayload::SendTransaction(FilledTransactionRequest { + let _confirmation_future = tester.signer.add_request(ConfirmationPayload::SendTransaction(FilledTransactionRequest { from: address, used_default_from: false, to: Some(recipient), @@ -374,7 +374,7 @@ fn should_return_error_when_sender_does_not_match() { let tester = signer_tester(); let address = tester.accounts.new_account("test").unwrap(); let recipient = Address::from_str("d46e8dd67c5d32be8058bb8eb970870f07244567").unwrap(); - tester.signer.add_request(ConfirmationPayload::SendTransaction(FilledTransactionRequest { + let _confirmation_future = tester.signer.add_request(ConfirmationPayload::SendTransaction(FilledTransactionRequest { from: Address::default(), used_default_from: false, to: Some(recipient), @@ -421,7 +421,7 @@ fn should_confirm_sign_transaction_with_rlp() { let tester = signer_tester(); let address = tester.accounts.new_account("test").unwrap(); let recipient = Address::from_str("d46e8dd67c5d32be8058bb8eb970870f07244567").unwrap(); - tester.signer.add_request(ConfirmationPayload::SignTransaction(FilledTransactionRequest { + let _confirmation_future = tester.signer.add_request(ConfirmationPayload::SignTransaction(FilledTransactionRequest { from: address, used_default_from: false, to: Some(recipient), @@ -485,7 +485,7 @@ fn should_confirm_data_sign_with_signature() { // given let tester = signer_tester(); let address = tester.accounts.new_account("test").unwrap(); - tester.signer.add_request(ConfirmationPayload::EthSignMessage( + let _confirmation_future = tester.signer.add_request(ConfirmationPayload::EthSignMessage( address, vec![1, 2, 3, 4].into(), ), Origin::Unknown).unwrap(); @@ -515,7 +515,7 @@ fn should_confirm_decrypt_with_phrase() { // given let tester = signer_tester(); let address = tester.accounts.new_account("test").unwrap(); - tester.signer.add_request(ConfirmationPayload::Decrypt( + let _confirmation_future = tester.signer.add_request(ConfirmationPayload::Decrypt( address, vec![1, 2, 3, 4].into(), ), Origin::Unknown).unwrap(); diff --git a/rpc/src/v1/tests/mocked/signing.rs b/rpc/src/v1/tests/mocked/signing.rs index a767bfb77..41c8ec7a0 100644 --- a/rpc/src/v1/tests/mocked/signing.rs +++ b/rpc/src/v1/tests/mocked/signing.rs @@ -14,6 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . +use std::thread; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; @@ -39,6 +40,8 @@ use ethcore::transaction::{Transaction, Action, SignedTransaction}; use ethstore::ethkey::{Generator, Random}; use serde_json; +use parity_reactor::Remote; + struct SigningTester { pub signer: Arc, pub client: Arc, @@ -58,9 +61,11 @@ impl Default for SigningTester { let dispatcher = FullDispatcher::new(client.clone(), miner.clone()); - let rpc = SigningQueueClient::new(&signer, dispatcher.clone(), &opt_accounts); + let remote = Remote::new_thread_per_future(); + + let rpc = SigningQueueClient::new(&signer, dispatcher.clone(), remote.clone(), &opt_accounts); io.extend_with(EthSigning::to_delegate(rpc)); - let rpc = SigningQueueClient::new(&signer, dispatcher, &opt_accounts); + let rpc = SigningQueueClient::new(&signer, dispatcher, remote, &opt_accounts); io.extend_with(ParitySigning::to_delegate(rpc)); SigningTester { @@ -184,6 +189,9 @@ fn should_check_status_of_request_when_its_resolved() { tester.io.handle_request_sync(&request).expect("Sent"); tester.signer.request_confirmed(1.into(), Ok(ConfirmationResponse::Signature(1.into()))); + // This is not ideal, but we need to give futures some time to be executed, and they need to run in a separate thread + thread::sleep(Duration::from_millis(20)); + // when let request = r#"{ "jsonrpc": "2.0", diff --git a/rpc/src/v1/types/confirmations.rs b/rpc/src/v1/types/confirmations.rs index fc4a0e303..5dcb11316 100644 --- a/rpc/src/v1/types/confirmations.rs +++ b/rpc/src/v1/types/confirmations.rs @@ -47,13 +47,13 @@ impl From for ConfirmationRequest { } impl fmt::Display for ConfirmationRequest { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "#{}: {} coming from {}", self.id, self.payload, self.origin) } } impl fmt::Display for ConfirmationPayload { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self { ConfirmationPayload::SendTransaction(ref transaction) => write!(f, "{}", transaction), ConfirmationPayload::SignTransaction(ref transaction) => write!(f, "(Sign only) {}", transaction), @@ -83,7 +83,7 @@ impl From<(H160, Bytes)> for SignRequest { } impl fmt::Display for SignRequest { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( f, "sign 0x{} with {}", @@ -113,7 +113,7 @@ impl From<(H160, Bytes)> for DecryptRequest { } impl fmt::Display for DecryptRequest { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( f, "decrypt data with {}", diff --git a/rpc/src/v1/types/transaction_request.rs b/rpc/src/v1/types/transaction_request.rs index 7f1c16287..2d4c86c7e 100644 --- a/rpc/src/v1/types/transaction_request.rs +++ b/rpc/src/v1/types/transaction_request.rs @@ -62,7 +62,7 @@ pub fn format_ether(i: U256) -> String { } impl fmt::Display for TransactionRequest { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let eth = self.value.unwrap_or(U256::from(0)); match self.to { Some(ref to) => write!(