Fix concurrent access to signer queue (#8854)
* Fix concurrent access to signer queue * Put request back to the queue if confirmation failed * typo: fix docs and rename functions to be more specific `request_notify` does not need to be public, and it's renamed to `notify_result`. `notify` is renamed to `notify_message`. * Change trace info "Transaction" -> "Request"
This commit is contained in:
parent
0bb78814a6
commit
b34d46cbc8
@ -44,7 +44,7 @@ pub use self::requests::{
|
|||||||
TransactionRequest, FilledTransactionRequest, ConfirmationRequest, ConfirmationPayload, CallRequest,
|
TransactionRequest, FilledTransactionRequest, ConfirmationRequest, ConfirmationPayload, CallRequest,
|
||||||
};
|
};
|
||||||
pub use self::signing_queue::{
|
pub use self::signing_queue::{
|
||||||
ConfirmationsQueue, ConfirmationReceiver, ConfirmationResult,
|
ConfirmationsQueue, ConfirmationReceiver, ConfirmationResult, ConfirmationSender,
|
||||||
SigningQueue, QueueEvent, DefaultAccount,
|
SigningQueue, QueueEvent, DefaultAccount,
|
||||||
QUEUE_LIMIT as SIGNING_QUEUE_LIMIT,
|
QUEUE_LIMIT as SIGNING_QUEUE_LIMIT,
|
||||||
};
|
};
|
||||||
|
@ -75,16 +75,17 @@ pub trait SigningQueue: Send + Sync {
|
|||||||
/// `ConfirmationReceiver` is a `Future` awaiting for resolution of the given request.
|
/// `ConfirmationReceiver` is a `Future` awaiting for resolution of the given request.
|
||||||
fn add_request(&self, request: ConfirmationPayload, origin: Origin) -> Result<(U256, ConfirmationReceiver), QueueAddError>;
|
fn add_request(&self, request: ConfirmationPayload, origin: Origin) -> Result<(U256, ConfirmationReceiver), QueueAddError>;
|
||||||
|
|
||||||
/// Removes a request from the queue.
|
|
||||||
/// Notifies possible token holders that request was rejected.
|
/// Notifies possible token holders that request was rejected.
|
||||||
fn request_rejected(&self, id: U256) -> Option<ConfirmationRequest>;
|
fn request_rejected(&self, sender: ConfirmationSender) -> Option<ConfirmationRequest>;
|
||||||
|
|
||||||
/// Removes a request from the queue.
|
|
||||||
/// Notifies possible token holders that request was confirmed and given hash was assigned.
|
/// Notifies possible token holders that request was confirmed and given hash was assigned.
|
||||||
fn request_confirmed(&self, id: U256, result: ConfirmationResult) -> Option<ConfirmationRequest>;
|
fn request_confirmed(&self, sender: ConfirmationSender, result: ConfirmationResult) -> Option<ConfirmationRequest>;
|
||||||
|
|
||||||
/// Returns a request if it is contained in the queue.
|
/// Put a request taken from `SigningQueue::take` back to the queue.
|
||||||
fn peek(&self, id: &U256) -> Option<ConfirmationRequest>;
|
fn request_untouched(&self, sender: ConfirmationSender);
|
||||||
|
|
||||||
|
/// Returns and removes a request if it is contained in the queue.
|
||||||
|
fn take(&self, id: &U256) -> Option<ConfirmationSender>;
|
||||||
|
|
||||||
/// Return copy of all the requests in the queue.
|
/// Return copy of all the requests in the queue.
|
||||||
fn requests(&self) -> Vec<ConfirmationRequest>;
|
fn requests(&self) -> Vec<ConfirmationRequest>;
|
||||||
@ -96,9 +97,12 @@ pub trait SigningQueue: Send + Sync {
|
|||||||
fn is_empty(&self) -> bool;
|
fn is_empty(&self) -> bool;
|
||||||
}
|
}
|
||||||
|
|
||||||
struct ConfirmationSender {
|
/// Confirmation request information with result notifier.
|
||||||
|
pub struct ConfirmationSender {
|
||||||
|
/// Confirmation request information.
|
||||||
|
pub request: ConfirmationRequest,
|
||||||
|
|
||||||
sender: oneshot::Sender<ConfirmationResult>,
|
sender: oneshot::Sender<ConfirmationResult>,
|
||||||
request: ConfirmationRequest,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Receiving end of the Confirmation channel; can be used as a `Future` to await for `ConfirmationRequest`
|
/// Receiving end of the Confirmation channel; can be used as a `Future` to await for `ConfirmationRequest`
|
||||||
@ -122,36 +126,29 @@ impl ConfirmationsQueue {
|
|||||||
/// Notifies consumer that the communcation is over.
|
/// Notifies consumer that the communcation is over.
|
||||||
/// No more events will be sent after this function is invoked.
|
/// No more events will be sent after this function is invoked.
|
||||||
pub fn finish(&self) {
|
pub fn finish(&self) {
|
||||||
self.notify(QueueEvent::Finish);
|
self.notify_message(QueueEvent::Finish);
|
||||||
self.on_event.write().clear();
|
self.on_event.write().clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Notifies receiver about the event happening in this queue.
|
/// Notifies `ConfirmationReceiver` holder about the result given a request.
|
||||||
fn notify(&self, message: QueueEvent) {
|
fn notify_result(&self, sender: ConfirmationSender, result: Option<ConfirmationResult>) -> Option<ConfirmationRequest> {
|
||||||
for listener in &*self.on_event.read() {
|
// notify receiver about the event
|
||||||
listener(message.clone())
|
self.notify_message(result.clone().map_or_else(
|
||||||
}
|
|| QueueEvent::RequestRejected(sender.request.id),
|
||||||
|
|_| QueueEvent::RequestConfirmed(sender.request.id)
|
||||||
|
));
|
||||||
|
|
||||||
|
// notify confirmation receiver about resolution
|
||||||
|
let result = result.ok_or(errors::request_rejected());
|
||||||
|
sender.sender.send(result);
|
||||||
|
|
||||||
|
Some(sender.request)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Removes requests from this queue and notifies `ConfirmationReceiver` holder about the result.
|
/// Notifies receiver about the event happening in this queue.
|
||||||
/// Notifies also a receiver about that event.
|
fn notify_message(&self, message: QueueEvent) {
|
||||||
fn remove(&self, id: U256, result: Option<ConfirmationResult>) -> Option<ConfirmationRequest> {
|
for listener in &*self.on_event.read() {
|
||||||
let sender = self.queue.write().remove(&id);
|
listener(message.clone())
|
||||||
|
|
||||||
if let Some(sender) = sender {
|
|
||||||
// notify receiver about the event
|
|
||||||
self.notify(result.clone().map_or_else(
|
|
||||||
|| QueueEvent::RequestRejected(id),
|
|
||||||
|_| QueueEvent::RequestConfirmed(id)
|
|
||||||
));
|
|
||||||
|
|
||||||
// notify confirmation receiver about resolution
|
|
||||||
let result = result.ok_or(errors::request_rejected());
|
|
||||||
sender.sender.send(result);
|
|
||||||
|
|
||||||
Some(sender.request)
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -193,22 +190,26 @@ impl SigningQueue for ConfirmationsQueue {
|
|||||||
(id, receiver)
|
(id, receiver)
|
||||||
};
|
};
|
||||||
// Notify listeners
|
// Notify listeners
|
||||||
self.notify(QueueEvent::NewRequest(id));
|
self.notify_message(QueueEvent::NewRequest(id));
|
||||||
Ok(res)
|
Ok(res)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn peek(&self, id: &U256) -> Option<ConfirmationRequest> {
|
fn take(&self, id: &U256) -> Option<ConfirmationSender> {
|
||||||
self.queue.read().get(id).map(|sender| sender.request.clone())
|
self.queue.write().remove(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn request_rejected(&self, id: U256) -> Option<ConfirmationRequest> {
|
fn request_rejected(&self, sender: ConfirmationSender) -> Option<ConfirmationRequest> {
|
||||||
debug!(target: "own_tx", "Signer: Request rejected ({:?}).", id);
|
debug!(target: "own_tx", "Signer: Request rejected ({:?}).", sender.request.id);
|
||||||
self.remove(id, None)
|
self.notify_result(sender, None)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn request_confirmed(&self, id: U256, result: ConfirmationResult) -> Option<ConfirmationRequest> {
|
fn request_confirmed(&self, sender: ConfirmationSender, result: ConfirmationResult) -> Option<ConfirmationRequest> {
|
||||||
debug!(target: "own_tx", "Signer: Transaction confirmed ({:?}).", id);
|
debug!(target: "own_tx", "Signer: Request confirmed ({:?}).", sender.request.id);
|
||||||
self.remove(id, Some(result))
|
self.notify_result(sender, Some(result))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn request_untouched(&self, sender: ConfirmationSender) {
|
||||||
|
self.queue.write().insert(sender.request.id, sender);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn requests(&self) -> Vec<ConfirmationRequest> {
|
fn requests(&self) -> Vec<ConfirmationRequest> {
|
||||||
@ -260,7 +261,8 @@ mod test {
|
|||||||
|
|
||||||
// when
|
// when
|
||||||
let (id, future) = queue.add_request(request, Default::default()).unwrap();
|
let (id, future) = queue.add_request(request, Default::default()).unwrap();
|
||||||
queue.request_confirmed(id, Ok(ConfirmationResponse::SendTransaction(1.into())));
|
let sender = queue.take(&id).unwrap();
|
||||||
|
queue.request_confirmed(sender, Ok(ConfirmationResponse::SendTransaction(1.into())));
|
||||||
|
|
||||||
// then
|
// then
|
||||||
let confirmation = future.wait().unwrap();
|
let confirmation = future.wait().unwrap();
|
||||||
|
@ -86,11 +86,11 @@ impl<D: Dispatcher + 'static> SignerClient<D> {
|
|||||||
let dispatcher = self.dispatcher.clone();
|
let dispatcher = self.dispatcher.clone();
|
||||||
let signer = self.signer.clone();
|
let signer = self.signer.clone();
|
||||||
|
|
||||||
Box::new(signer.peek(&id).map(|confirmation| {
|
Box::new(signer.take(&id).map(|sender| {
|
||||||
let mut payload = confirmation.payload.clone();
|
let mut payload = sender.request.payload.clone();
|
||||||
// Modify payload
|
// Modify payload
|
||||||
if let ConfirmationPayload::SendTransaction(ref mut request) = payload {
|
if let ConfirmationPayload::SendTransaction(ref mut request) = payload {
|
||||||
if let Some(sender) = modification.sender.clone() {
|
if let Some(sender) = modification.sender {
|
||||||
request.from = sender.into();
|
request.from = sender.into();
|
||||||
// Altering sender should always reset the nonce.
|
// Altering sender should always reset the nonce.
|
||||||
request.nonce = None;
|
request.nonce = None;
|
||||||
@ -109,7 +109,9 @@ impl<D: Dispatcher + 'static> SignerClient<D> {
|
|||||||
Either::A(fut.into_future().then(move |result| {
|
Either::A(fut.into_future().then(move |result| {
|
||||||
// Execute
|
// Execute
|
||||||
if let Ok(ref response) = result {
|
if let Ok(ref response) = result {
|
||||||
signer.request_confirmed(id, Ok((*response).clone()));
|
signer.request_confirmed(sender, Ok((*response).clone()));
|
||||||
|
} else {
|
||||||
|
signer.request_untouched(sender);
|
||||||
}
|
}
|
||||||
|
|
||||||
result
|
result
|
||||||
@ -188,8 +190,9 @@ impl<D: Dispatcher + 'static> Signer for SignerClient<D> {
|
|||||||
fn confirm_request_raw(&self, id: U256, bytes: Bytes) -> Result<ConfirmationResponse> {
|
fn confirm_request_raw(&self, id: U256, bytes: Bytes) -> Result<ConfirmationResponse> {
|
||||||
let id = id.into();
|
let id = id.into();
|
||||||
|
|
||||||
self.signer.peek(&id).map(|confirmation| {
|
self.signer.take(&id).map(|sender| {
|
||||||
let result = match confirmation.payload {
|
let payload = sender.request.payload.clone();
|
||||||
|
let result = match payload {
|
||||||
ConfirmationPayload::SendTransaction(request) => {
|
ConfirmationPayload::SendTransaction(request) => {
|
||||||
Self::verify_transaction(bytes, request, |pending_transaction| {
|
Self::verify_transaction(bytes, request, |pending_transaction| {
|
||||||
self.dispatcher.dispatch_transaction(pending_transaction)
|
self.dispatcher.dispatch_transaction(pending_transaction)
|
||||||
@ -218,14 +221,16 @@ impl<D: Dispatcher + 'static> Signer for SignerClient<D> {
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
if let Ok(ref response) = result {
|
if let Ok(ref response) = result {
|
||||||
self.signer.request_confirmed(id, Ok(response.clone()));
|
self.signer.request_confirmed(sender, Ok(response.clone()));
|
||||||
|
} else {
|
||||||
|
self.signer.request_untouched(sender);
|
||||||
}
|
}
|
||||||
result
|
result
|
||||||
}).unwrap_or_else(|| Err(errors::invalid_params("Unknown RequestID", id)))
|
}).unwrap_or_else(|| Err(errors::invalid_params("Unknown RequestID", id)))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn reject_request(&self, id: U256) -> Result<bool> {
|
fn reject_request(&self, id: U256) -> Result<bool> {
|
||||||
let res = self.signer.request_rejected(id.into());
|
let res = self.signer.take(&id.into()).map(|sender| self.signer.request_rejected(sender));
|
||||||
Ok(res.is_some())
|
Ok(res.is_some())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -109,7 +109,8 @@ fn should_add_sign_to_queue() {
|
|||||||
::std::thread::spawn(move || loop {
|
::std::thread::spawn(move || loop {
|
||||||
if signer.requests().len() == 1 {
|
if signer.requests().len() == 1 {
|
||||||
// respond
|
// respond
|
||||||
signer.request_confirmed(1.into(), Ok(ConfirmationResponse::Signature(0.into())));
|
let sender = signer.take(&1.into()).unwrap();
|
||||||
|
signer.request_confirmed(sender, Ok(ConfirmationResponse::Signature(0.into())));
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
::std::thread::sleep(Duration::from_millis(100))
|
::std::thread::sleep(Duration::from_millis(100))
|
||||||
@ -187,7 +188,8 @@ fn should_check_status_of_request_when_its_resolved() {
|
|||||||
"id": 1
|
"id": 1
|
||||||
}"#;
|
}"#;
|
||||||
tester.io.handle_request_sync(&request).expect("Sent");
|
tester.io.handle_request_sync(&request).expect("Sent");
|
||||||
tester.signer.request_confirmed(1.into(), Ok(ConfirmationResponse::Signature(1.into())));
|
let sender = tester.signer.take(&1.into()).unwrap();
|
||||||
|
tester.signer.request_confirmed(sender, Ok(ConfirmationResponse::Signature(1.into())));
|
||||||
|
|
||||||
// This is not ideal, but we need to give futures some time to be executed, and they need to run in a separate thread
|
// This is not ideal, but we need to give futures some time to be executed, and they need to run in a separate thread
|
||||||
thread::sleep(Duration::from_millis(20));
|
thread::sleep(Duration::from_millis(20));
|
||||||
@ -258,7 +260,8 @@ fn should_add_transaction_to_queue() {
|
|||||||
::std::thread::spawn(move || loop {
|
::std::thread::spawn(move || loop {
|
||||||
if signer.requests().len() == 1 {
|
if signer.requests().len() == 1 {
|
||||||
// respond
|
// respond
|
||||||
signer.request_confirmed(1.into(), Ok(ConfirmationResponse::SendTransaction(0.into())));
|
let sender = signer.take(&1.into()).unwrap();
|
||||||
|
signer.request_confirmed(sender, Ok(ConfirmationResponse::SendTransaction(0.into())));
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
::std::thread::sleep(Duration::from_millis(100))
|
::std::thread::sleep(Duration::from_millis(100))
|
||||||
@ -334,7 +337,8 @@ fn should_add_sign_transaction_to_the_queue() {
|
|||||||
::std::thread::spawn(move || loop {
|
::std::thread::spawn(move || loop {
|
||||||
if signer.requests().len() == 1 {
|
if signer.requests().len() == 1 {
|
||||||
// respond
|
// respond
|
||||||
signer.request_confirmed(1.into(), Ok(ConfirmationResponse::SignTransaction(
|
let sender = signer.take(&1.into()).unwrap();
|
||||||
|
signer.request_confirmed(sender, Ok(ConfirmationResponse::SignTransaction(
|
||||||
RichRawTransaction::from_signed(t.into(), 0x0, u64::max_value())
|
RichRawTransaction::from_signed(t.into(), 0x0, u64::max_value())
|
||||||
)));
|
)));
|
||||||
break
|
break
|
||||||
@ -440,7 +444,8 @@ fn should_add_decryption_to_the_queue() {
|
|||||||
::std::thread::spawn(move || loop {
|
::std::thread::spawn(move || loop {
|
||||||
if signer.requests().len() == 1 {
|
if signer.requests().len() == 1 {
|
||||||
// respond
|
// respond
|
||||||
signer.request_confirmed(1.into(), Ok(ConfirmationResponse::Decrypt(vec![0x1, 0x2].into())));
|
let sender = signer.take(&1.into()).unwrap();
|
||||||
|
signer.request_confirmed(sender, Ok(ConfirmationResponse::Decrypt(vec![0x1, 0x2].into())));
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
::std::thread::sleep(Duration::from_millis(10))
|
::std::thread::sleep(Duration::from_millis(10))
|
||||||
|
Loading…
Reference in New Issue
Block a user