diff --git a/rpc/src/v1/helpers/mod.rs b/rpc/src/v1/helpers/mod.rs index 97b96675e..ce2babd07 100644 --- a/rpc/src/v1/helpers/mod.rs +++ b/rpc/src/v1/helpers/mod.rs @@ -44,7 +44,7 @@ pub use self::requests::{ TransactionRequest, FilledTransactionRequest, ConfirmationRequest, ConfirmationPayload, CallRequest, }; pub use self::signing_queue::{ - ConfirmationsQueue, ConfirmationReceiver, ConfirmationResult, + ConfirmationsQueue, ConfirmationReceiver, ConfirmationResult, ConfirmationSender, SigningQueue, QueueEvent, DefaultAccount, QUEUE_LIMIT as SIGNING_QUEUE_LIMIT, }; diff --git a/rpc/src/v1/helpers/signing_queue.rs b/rpc/src/v1/helpers/signing_queue.rs index c6a804882..17b26b01e 100644 --- a/rpc/src/v1/helpers/signing_queue.rs +++ b/rpc/src/v1/helpers/signing_queue.rs @@ -75,16 +75,17 @@ pub trait SigningQueue: Send + Sync { /// `ConfirmationReceiver` is a `Future` awaiting for resolution of the given request. fn add_request(&self, request: ConfirmationPayload, origin: Origin) -> Result<(U256, ConfirmationReceiver), QueueAddError>; - /// Removes a request from the queue. /// Notifies possible token holders that request was rejected. - fn request_rejected(&self, id: U256) -> Option; + fn request_rejected(&self, sender: ConfirmationSender) -> Option; - /// Removes a request from the queue. /// Notifies possible token holders that request was confirmed and given hash was assigned. - fn request_confirmed(&self, id: U256, result: ConfirmationResult) -> Option; + fn request_confirmed(&self, sender: ConfirmationSender, result: ConfirmationResult) -> Option; - /// Returns a request if it is contained in the queue. - fn peek(&self, id: &U256) -> Option; + /// Put a request taken from `SigningQueue::take` back to the queue. + fn request_untouched(&self, sender: ConfirmationSender); + + /// Returns and removes a request if it is contained in the queue. + fn take(&self, id: &U256) -> Option; /// Return copy of all the requests in the queue. fn requests(&self) -> Vec; @@ -96,9 +97,12 @@ pub trait SigningQueue: Send + Sync { fn is_empty(&self) -> bool; } -struct ConfirmationSender { +/// Confirmation request information with result notifier. +pub struct ConfirmationSender { + /// Confirmation request information. + pub request: ConfirmationRequest, + sender: oneshot::Sender, - request: ConfirmationRequest, } /// Receiving end of the Confirmation channel; can be used as a `Future` to await for `ConfirmationRequest` @@ -122,36 +126,29 @@ impl ConfirmationsQueue { /// Notifies consumer that the communcation is over. /// No more events will be sent after this function is invoked. pub fn finish(&self) { - self.notify(QueueEvent::Finish); + self.notify_message(QueueEvent::Finish); self.on_event.write().clear(); } - /// Notifies receiver about the event happening in this queue. - fn notify(&self, message: QueueEvent) { - for listener in &*self.on_event.read() { - listener(message.clone()) - } + /// Notifies `ConfirmationReceiver` holder about the result given a request. + fn notify_result(&self, sender: ConfirmationSender, result: Option) -> Option { + // notify receiver about the event + self.notify_message(result.clone().map_or_else( + || QueueEvent::RequestRejected(sender.request.id), + |_| QueueEvent::RequestConfirmed(sender.request.id) + )); + + // notify confirmation receiver about resolution + let result = result.ok_or(errors::request_rejected()); + sender.sender.send(result); + + Some(sender.request) } - /// Removes requests from this queue and notifies `ConfirmationReceiver` holder about the result. - /// Notifies also a receiver about that event. - fn remove(&self, id: U256, result: Option) -> Option { - let sender = self.queue.write().remove(&id); - - if let Some(sender) = sender { - // notify receiver about the event - self.notify(result.clone().map_or_else( - || QueueEvent::RequestRejected(id), - |_| QueueEvent::RequestConfirmed(id) - )); - - // notify confirmation receiver about resolution - let result = result.ok_or(errors::request_rejected()); - sender.sender.send(result); - - Some(sender.request) - } else { - None + /// Notifies receiver about the event happening in this queue. + fn notify_message(&self, message: QueueEvent) { + for listener in &*self.on_event.read() { + listener(message.clone()) } } } @@ -193,22 +190,26 @@ impl SigningQueue for ConfirmationsQueue { (id, receiver) }; // Notify listeners - self.notify(QueueEvent::NewRequest(id)); + self.notify_message(QueueEvent::NewRequest(id)); Ok(res) } - fn peek(&self, id: &U256) -> Option { - self.queue.read().get(id).map(|sender| sender.request.clone()) + fn take(&self, id: &U256) -> Option { + self.queue.write().remove(id) } - fn request_rejected(&self, id: U256) -> Option { - debug!(target: "own_tx", "Signer: Request rejected ({:?}).", id); - self.remove(id, None) + fn request_rejected(&self, sender: ConfirmationSender) -> Option { + debug!(target: "own_tx", "Signer: Request rejected ({:?}).", sender.request.id); + self.notify_result(sender, None) } - fn request_confirmed(&self, id: U256, result: ConfirmationResult) -> Option { - debug!(target: "own_tx", "Signer: Transaction confirmed ({:?}).", id); - self.remove(id, Some(result)) + fn request_confirmed(&self, sender: ConfirmationSender, result: ConfirmationResult) -> Option { + debug!(target: "own_tx", "Signer: Request confirmed ({:?}).", sender.request.id); + self.notify_result(sender, Some(result)) + } + + fn request_untouched(&self, sender: ConfirmationSender) { + self.queue.write().insert(sender.request.id, sender); } fn requests(&self) -> Vec { @@ -260,7 +261,8 @@ mod test { // when let (id, future) = queue.add_request(request, Default::default()).unwrap(); - queue.request_confirmed(id, Ok(ConfirmationResponse::SendTransaction(1.into()))); + let sender = queue.take(&id).unwrap(); + queue.request_confirmed(sender, Ok(ConfirmationResponse::SendTransaction(1.into()))); // then let confirmation = future.wait().unwrap(); diff --git a/rpc/src/v1/impls/signer.rs b/rpc/src/v1/impls/signer.rs index e679388cb..58c48bb32 100644 --- a/rpc/src/v1/impls/signer.rs +++ b/rpc/src/v1/impls/signer.rs @@ -86,11 +86,11 @@ impl SignerClient { let dispatcher = self.dispatcher.clone(); let signer = self.signer.clone(); - Box::new(signer.peek(&id).map(|confirmation| { - let mut payload = confirmation.payload.clone(); + Box::new(signer.take(&id).map(|sender| { + let mut payload = sender.request.payload.clone(); // Modify payload if let ConfirmationPayload::SendTransaction(ref mut request) = payload { - if let Some(sender) = modification.sender.clone() { + if let Some(sender) = modification.sender { request.from = sender.into(); // Altering sender should always reset the nonce. request.nonce = None; @@ -109,7 +109,9 @@ impl SignerClient { Either::A(fut.into_future().then(move |result| { // Execute if let Ok(ref response) = result { - signer.request_confirmed(id, Ok((*response).clone())); + signer.request_confirmed(sender, Ok((*response).clone())); + } else { + signer.request_untouched(sender); } result @@ -188,8 +190,9 @@ impl Signer for SignerClient { fn confirm_request_raw(&self, id: U256, bytes: Bytes) -> Result { let id = id.into(); - self.signer.peek(&id).map(|confirmation| { - let result = match confirmation.payload { + self.signer.take(&id).map(|sender| { + let payload = sender.request.payload.clone(); + let result = match payload { ConfirmationPayload::SendTransaction(request) => { Self::verify_transaction(bytes, request, |pending_transaction| { self.dispatcher.dispatch_transaction(pending_transaction) @@ -218,14 +221,16 @@ impl Signer for SignerClient { }, }; if let Ok(ref response) = result { - self.signer.request_confirmed(id, Ok(response.clone())); + self.signer.request_confirmed(sender, Ok(response.clone())); + } else { + self.signer.request_untouched(sender); } result }).unwrap_or_else(|| Err(errors::invalid_params("Unknown RequestID", id))) } fn reject_request(&self, id: U256) -> Result { - let res = self.signer.request_rejected(id.into()); + let res = self.signer.take(&id.into()).map(|sender| self.signer.request_rejected(sender)); Ok(res.is_some()) } diff --git a/rpc/src/v1/tests/mocked/signing.rs b/rpc/src/v1/tests/mocked/signing.rs index ba9fa6d4b..42d20bbf8 100644 --- a/rpc/src/v1/tests/mocked/signing.rs +++ b/rpc/src/v1/tests/mocked/signing.rs @@ -109,7 +109,8 @@ fn should_add_sign_to_queue() { ::std::thread::spawn(move || loop { if signer.requests().len() == 1 { // respond - signer.request_confirmed(1.into(), Ok(ConfirmationResponse::Signature(0.into()))); + let sender = signer.take(&1.into()).unwrap(); + signer.request_confirmed(sender, Ok(ConfirmationResponse::Signature(0.into()))); break } ::std::thread::sleep(Duration::from_millis(100)) @@ -187,7 +188,8 @@ fn should_check_status_of_request_when_its_resolved() { "id": 1 }"#; tester.io.handle_request_sync(&request).expect("Sent"); - tester.signer.request_confirmed(1.into(), Ok(ConfirmationResponse::Signature(1.into()))); + let sender = tester.signer.take(&1.into()).unwrap(); + tester.signer.request_confirmed(sender, Ok(ConfirmationResponse::Signature(1.into()))); // This is not ideal, but we need to give futures some time to be executed, and they need to run in a separate thread thread::sleep(Duration::from_millis(20)); @@ -258,7 +260,8 @@ fn should_add_transaction_to_queue() { ::std::thread::spawn(move || loop { if signer.requests().len() == 1 { // respond - signer.request_confirmed(1.into(), Ok(ConfirmationResponse::SendTransaction(0.into()))); + let sender = signer.take(&1.into()).unwrap(); + signer.request_confirmed(sender, Ok(ConfirmationResponse::SendTransaction(0.into()))); break } ::std::thread::sleep(Duration::from_millis(100)) @@ -334,7 +337,8 @@ fn should_add_sign_transaction_to_the_queue() { ::std::thread::spawn(move || loop { if signer.requests().len() == 1 { // respond - signer.request_confirmed(1.into(), Ok(ConfirmationResponse::SignTransaction( + let sender = signer.take(&1.into()).unwrap(); + signer.request_confirmed(sender, Ok(ConfirmationResponse::SignTransaction( RichRawTransaction::from_signed(t.into(), 0x0, u64::max_value()) ))); break @@ -440,7 +444,8 @@ fn should_add_decryption_to_the_queue() { ::std::thread::spawn(move || loop { if signer.requests().len() == 1 { // respond - signer.request_confirmed(1.into(), Ok(ConfirmationResponse::Decrypt(vec![0x1, 0x2].into()))); + let sender = signer.take(&1.into()).unwrap(); + signer.request_confirmed(sender, Ok(ConfirmationResponse::Decrypt(vec![0x1, 0x2].into()))); break } ::std::thread::sleep(Duration::from_millis(10))