diff --git a/rpc/src/v1/helpers/signing_queue.rs b/rpc/src/v1/helpers/signing_queue.rs index b2e89522d..65aef8a33 100644 --- a/rpc/src/v1/helpers/signing_queue.rs +++ b/rpc/src/v1/helpers/signing_queue.rs @@ -14,10 +14,34 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use std::sync::Mutex; +use std::thread; +use std::time::{Instant, Duration}; +use std::sync::{mpsc, Mutex, RwLock}; use std::collections::HashMap; use v1::types::{TransactionRequest, TransactionConfirmation}; -use util::U256; +use util::{U256, H256}; + + +/// Messages that queue informs about +#[derive(Debug, PartialEq)] +pub enum QueueMessage { + /// Receiver should stop work upon receiving `Finish` message. + Finish, + /// Informs about new transaction request. + NewRequest(U256), +} + +/// Defines possible errors returned from queue receiving method. +#[derive(Debug, PartialEq)] +pub enum QueueError { + /// Returned when method has been already used (no receiver available). + AlreadyUsed, + /// Returned when receiver encounters an error. + ReceiverError(mpsc::RecvError), +} + +/// Message Receiver type +pub type QueueMessageReceiver = mpsc::Receiver; /// A queue of transactions awaiting to be confirmed and signed. pub trait SigningQueue: Send + Sync { @@ -25,17 +49,106 @@ pub trait SigningQueue: Send + Sync { fn add_request(&self, transaction: TransactionRequest) -> U256; /// Remove request from the queue. - fn remove_request(&self, id: U256) -> Option; + /// Notify possible waiters that transaction was rejected. + fn request_rejected(&self, id: U256) -> Option; + + /// Remove request from the queue. + /// Notify possible waiters that transaction was confirmed and got given hash. + fn request_confirmed(&self, id: U256, hash: H256) -> Option; + + /// Returns a request if it is contained in the queue. + fn peek(&self, id: &U256) -> Option; /// Return copy of all the requests in the queue. fn requests(&self) -> Vec; + + /// Blocks for some time waiting for confirmation. + /// Returns `None` when timeout reached or transaction was rejected. + /// Returns transaction hash when transaction was confirmed. + fn wait_with_timeout(&self, id: U256) -> Option; +} + +/// Time you need to confirm the transaction in UI. +/// Unless we have a multi-threaded RPC this will lock +/// any other incoming call! +const QUEUE_TIMEOUT_DURATION_SEC : u64 = 20; + +#[derive(Debug, Clone)] +enum QueueStatus { + Waiting, + Rejected, + Confirmed(H256), } /// Queue for all unconfirmed transactions. -#[derive(Default)] pub struct ConfirmationsQueue { id: Mutex, - queue: Mutex>, + waiters: RwLock>, + queue: RwLock>, + sender: Mutex>, + receiver: Mutex>>, +} + +impl Default for ConfirmationsQueue { + fn default() -> Self { + let (send, recv) = mpsc::channel(); + + ConfirmationsQueue { + id: Mutex::new(U256::from(0)), + waiters: RwLock::new(HashMap::new()), + queue: RwLock::new(HashMap::new()), + sender: Mutex::new(send), + receiver: Mutex::new(Some(recv)), + } + } +} + +impl ConfirmationsQueue { + /// Blocks the thread and starts listening for notifications. + /// For each event `listener` callback function will be invoked. + /// This method can be used only once. + pub fn start_listening(&self, listener: F) -> Result<(), QueueError> + where F: Fn(QueueMessage) -> () { + let recv = self.receiver.lock().unwrap().take(); + if let None = recv { + return Err(QueueError::AlreadyUsed); + } + let recv = recv.expect("Check for none is done earlier."); + + loop { + let message = try!(recv.recv().map_err(|e| QueueError::ReceiverError(e))); + if let QueueMessage::Finish = message { + return Ok(()); + } + + listener(message); + } + } + + /// Notifies receiver that the communcation is over. + pub fn finish(&self) { + self.notify(QueueMessage::Finish); + } + + fn notify(&self, message: QueueMessage) { + // We don't really care about the result + let _ = self.sender.lock().unwrap().send(message); + } + + fn remove(&self, id: U256) -> Option { + self.queue.write().unwrap().remove(&id) + } + + fn update_status(&self, id: U256, status: QueueStatus) { + let mut waiters = self.waiters.write().unwrap(); + waiters.insert(id, status); + } +} + +impl Drop for ConfirmationsQueue { + fn drop(&mut self) { + self.finish(); + } } impl SigningQueue for ConfirmationsQueue { @@ -46,38 +159,98 @@ impl SigningQueue for ConfirmationsQueue { *last_id = *last_id + U256::from(1); *last_id }; - let mut queue = self.queue.lock().unwrap(); - queue.insert(id, TransactionConfirmation { - id: id, - transaction: transaction, - }); + // Add request to queue + { + let mut queue = self.queue.write().unwrap(); + queue.insert(id, TransactionConfirmation { + id: id, + transaction: transaction, + }); + } + // Notify listeners + self.notify(QueueMessage::NewRequest(id)); id } - fn remove_request(&self, id: U256) -> Option { - self.queue.lock().unwrap().remove(&id) + fn peek(&self, id: &U256) -> Option { + self.queue.read().unwrap().get(id).cloned() + } + + fn request_rejected(&self, id: U256) -> Option { + let o = self.remove(id); + self.update_status(id, QueueStatus::Rejected); + o + } + + fn request_confirmed(&self, id: U256, hash: H256) -> Option { + let o = self.remove(id); + self.update_status(id, QueueStatus::Confirmed(hash)); + o } fn requests(&self) -> Vec { - let queue = self.queue.lock().unwrap(); + let queue = self.queue.read().unwrap(); queue.values().cloned().collect() } + + fn wait_with_timeout(&self, id: U256) -> Option { + { + let mut waiters = self.waiters.write().unwrap(); + let r = waiters.insert(id, QueueStatus::Waiting); + match r { + // This is ok, we can have many waiters + Some(QueueStatus::Waiting) | None => {}, + // There already was a response for someone. + // The one waiting for it will cleanup, so... + Some(v) => { + // ... insert old status back + waiters.insert(id, v.clone()); + if let QueueStatus::Confirmed(h) = v { + return Some(h); + } + return None; + }, + } + } + + // Now wait for a response + let deadline = Instant::now() + Duration::from_secs(QUEUE_TIMEOUT_DURATION_SEC); + while Instant::now() < deadline { + let status = { + let waiters = self.waiters.read().unwrap(); + waiters.get(&id).expect("Only the waiting thread can remove any message.").clone() + }; + + match status { + QueueStatus::Waiting => thread::sleep(Duration::from_millis(50)), + QueueStatus::Confirmed(h) => { + self.waiters.write().unwrap().remove(&id); + return Some(h); + }, + QueueStatus::Rejected => { + self.waiters.write().unwrap().remove(&id); + return None; + }, + } + } + // We reached the timeout. Just return `None` + None + } } #[cfg(test)] mod test { + use std::time::Duration; + use std::thread; + use std::sync::{Arc, Mutex}; use util::hash::Address; - use util::numbers::U256; + use util::numbers::{U256, H256}; use v1::types::TransactionRequest; use super::*; - #[test] - fn should_work_for_hashset() { - // given - let queue = ConfirmationsQueue::default(); - - let request = TransactionRequest { + fn request() -> TransactionRequest { + TransactionRequest { from: Address::from(1), to: Some(Address::from(2)), gas_price: None, @@ -85,7 +258,63 @@ mod test { value: Some(U256::from(10_000_000)), data: None, nonce: None, - }; + } + } + + #[test] + fn should_wait_for_hash() { + // given + let queue = Arc::new(ConfirmationsQueue::default()); + let request = request(); + + // when + let q = queue.clone(); + let handle = thread::spawn(move || { + let v = q.add_request(request); + q.wait_with_timeout(v).expect("Should return hash") + }); + + let id = U256::from(1); + while queue.peek(&id).is_none() { + // Just wait for the other thread to start + thread::sleep(Duration::from_millis(100)); + } + queue.request_confirmed(id, H256::from(1)); + + // then + assert_eq!(handle.join().expect("Thread should finish nicely"), H256::from(1)); + } + + #[test] + fn should_receive_notification() { + // given + let received = Arc::new(Mutex::new(None)); + let queue = Arc::new(ConfirmationsQueue::default()); + let request = request(); + + // when + let q = queue.clone(); + let r = received.clone(); + let handle = thread::spawn(move || { + q.start_listening(move |notification| { + let mut v = r.lock().unwrap(); + *v = Some(notification); + }).expect("Should be closed nicely.") + }); + let v = queue.add_request(request); + queue.finish(); + + // then + handle.join().expect("Thread should finish nicely"); + let r = received.lock().unwrap().take(); + assert_eq!(r, Some(QueueMessage::NewRequest(v))); + } + + #[test] + fn should_add_transactions() { + // given + let queue = ConfirmationsQueue::default(); + let request = request(); // when queue.add_request(request.clone()); diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index f7c34ffa7..44bdef243 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -497,7 +497,7 @@ impl Eth for EthClient where .and_then(|(raw_transaction, )| { let raw_transaction = raw_transaction.to_vec(); match UntrustedRlp::new(&raw_transaction).as_val() { - Ok(signed_transaction) => dispatch_transaction(&*take_weak!(self.client), &*take_weak!(self.miner), signed_transaction), + Ok(signed_transaction) => to_value(&dispatch_transaction(&*take_weak!(self.client), &*take_weak!(self.miner), signed_transaction)), Err(_) => to_value(&H256::zero()), } }) diff --git a/rpc/src/v1/impls/eth_signing.rs b/rpc/src/v1/impls/eth_signing.rs index f4a972fb5..2ecc00f05 100644 --- a/rpc/src/v1/impls/eth_signing.rs +++ b/rpc/src/v1/impls/eth_signing.rs @@ -47,9 +47,9 @@ impl EthSigning for EthSigningQueueClient { from_params::<(TransactionRequest, )>(params) .and_then(|(request, )| { let queue = take_weak!(self.queue); - queue.add_request(request); - // TODO [ToDr] Block and wait for confirmation? - to_value(&H256::zero()) + let id = queue.add_request(request); + let result = queue.wait_with_timeout(id); + to_value(&result.unwrap_or_else(H256::new)) }) } } @@ -90,7 +90,7 @@ impl EthSigning for EthSigningUnsafeClient where .and_then(|(request, )| { let accounts = take_weak!(self.accounts); match accounts.account_secret(&request.from) { - Ok(secret) => sign_and_dispatch(&self.client, &self.miner, request, secret), + Ok(secret) => to_value(&sign_and_dispatch(&*take_weak!(self.client), &*take_weak!(self.miner), request, secret)), Err(_) => to_value(&H256::zero()) } }) diff --git a/rpc/src/v1/impls/mod.rs b/rpc/src/v1/impls/mod.rs index d3a9b70a2..4bf5c88e7 100644 --- a/rpc/src/v1/impls/mod.rs +++ b/rpc/src/v1/impls/mod.rs @@ -48,16 +48,14 @@ pub use self::traces::TracesClient; pub use self::rpc::RpcClient; use v1::types::TransactionRequest; -use std::sync::Weak; use ethminer::{AccountDetails, MinerService}; use ethcore::client::BlockChainClient; use ethcore::transaction::{Action, SignedTransaction, Transaction}; use util::numbers::*; use util::rlp::encode; use util::bytes::ToPretty; -use jsonrpc_core::{Error, to_value, Value}; -fn dispatch_transaction(client: &C, miner: &M, signed_transaction: SignedTransaction) -> Result +fn dispatch_transaction(client: &C, miner: &M, signed_transaction: SignedTransaction) -> H256 where C: BlockChainClient, M: MinerService { let hash = signed_transaction.hash(); @@ -71,18 +69,16 @@ fn dispatch_transaction(client: &C, miner: &M, signed_transaction: SignedT }; match import { - Ok(_) => to_value(&hash), + Ok(_) => hash, Err(e) => { warn!("Error sending transaction: {:?}", e); - to_value(&H256::zero()) + H256::zero() } } } -fn sign_and_dispatch(client: &Weak, miner: &Weak, request: TransactionRequest, secret: H256) -> Result +fn sign_and_dispatch(client: &C, miner: &M, request: TransactionRequest, secret: H256) -> H256 where C: BlockChainClient, M: MinerService { - let client = take_weak!(client); - let miner = take_weak!(miner); let signed_transaction = { Transaction { diff --git a/rpc/src/v1/impls/personal.rs b/rpc/src/v1/impls/personal.rs index 30d541772..4a419f1e3 100644 --- a/rpc/src/v1/impls/personal.rs +++ b/rpc/src/v1/impls/personal.rs @@ -83,7 +83,7 @@ impl Personal for PersonalClient .and_then(|(request, password)| { let accounts = take_weak!(self.accounts); match accounts.locked_account_secret(&request.from, &password) { - Ok(secret) => sign_and_dispatch(&self.client, &self.miner, request, secret), + Ok(secret) => to_value(&sign_and_dispatch(&*take_weak!(self.client), &*take_weak!(self.miner), request, secret)), Err(_) => to_value(&H256::zero()), } }) diff --git a/rpc/src/v1/impls/personal_signer.rs b/rpc/src/v1/impls/personal_signer.rs index cf4e927ac..9fd197db2 100644 --- a/rpc/src/v1/impls/personal_signer.rs +++ b/rpc/src/v1/impls/personal_signer.rs @@ -63,19 +63,27 @@ impl PersonalSigner for SignerClient Some(sign_and_dispatch(&self.client, &self.miner, request, secret)), + Ok(secret) => { + let hash = sign_and_dispatch(&*client, &*miner, request, secret); + queue.request_confirmed(id, hash); + Some(to_value(&hash)) + }, Err(_) => None } }) - .unwrap_or_else(|| to_value(&H256::zero())) + .unwrap_or_else(|| { + queue.request_rejected(id); + to_value(&H256::zero()) + }) } ) } @@ -84,7 +92,7 @@ impl PersonalSigner for SignerClient(params).and_then( |(id, )| { let queue = take_weak!(self.queue); - let res = queue.remove_request(id); + let res = queue.request_rejected(id); to_value(&res.is_some()) } )