Fix concurrent access to signer queue (#8854)
* Fix concurrent access to signer queue * Put request back to the queue if confirmation failed * typo: fix docs and rename functions to be more specific `request_notify` does not need to be public, and it's renamed to `notify_result`. `notify` is renamed to `notify_message`. * Change trace info "Transaction" -> "Request"
This commit is contained in:
		
							parent
							
								
									0127aff399
								
							
						
					
					
						commit
						2b8501a4b0
					
				| @ -45,7 +45,7 @@ pub use self::requests::{ | ||||
| 	TransactionRequest, FilledTransactionRequest, ConfirmationRequest, ConfirmationPayload, CallRequest, | ||||
| }; | ||||
| pub use self::signing_queue::{ | ||||
| 	ConfirmationsQueue, ConfirmationReceiver, ConfirmationResult, | ||||
| 	ConfirmationsQueue, ConfirmationReceiver, ConfirmationResult, ConfirmationSender, | ||||
| 	SigningQueue, QueueEvent, DefaultAccount, | ||||
| 	QUEUE_LIMIT as SIGNING_QUEUE_LIMIT, | ||||
| }; | ||||
|  | ||||
| @ -75,16 +75,17 @@ pub trait SigningQueue: Send + Sync { | ||||
| 	/// `ConfirmationReceiver` is a `Future` awaiting for resolution of the given request.
 | ||||
| 	fn add_request(&self, request: ConfirmationPayload, origin: Origin) -> Result<(U256, ConfirmationReceiver), QueueAddError>; | ||||
| 
 | ||||
| 	/// Removes a request from the queue.
 | ||||
| 	/// Notifies possible token holders that request was rejected.
 | ||||
| 	fn request_rejected(&self, id: U256) -> Option<ConfirmationRequest>; | ||||
| 	fn request_rejected(&self, sender: ConfirmationSender) -> Option<ConfirmationRequest>; | ||||
| 
 | ||||
| 	/// Removes a request from the queue.
 | ||||
| 	/// Notifies possible token holders that request was confirmed and given hash was assigned.
 | ||||
| 	fn request_confirmed(&self, id: U256, result: ConfirmationResult) -> Option<ConfirmationRequest>; | ||||
| 	fn request_confirmed(&self, sender: ConfirmationSender, result: ConfirmationResult) -> Option<ConfirmationRequest>; | ||||
| 
 | ||||
| 	/// Returns a request if it is contained in the queue.
 | ||||
| 	fn peek(&self, id: &U256) -> Option<ConfirmationRequest>; | ||||
| 	/// Put a request taken from `SigningQueue::take` back to the queue.
 | ||||
| 	fn request_untouched(&self, sender: ConfirmationSender); | ||||
| 
 | ||||
| 	/// Returns and removes a request if it is contained in the queue.
 | ||||
| 	fn take(&self, id: &U256) -> Option<ConfirmationSender>; | ||||
| 
 | ||||
| 	/// Return copy of all the requests in the queue.
 | ||||
| 	fn requests(&self) -> Vec<ConfirmationRequest>; | ||||
| @ -96,9 +97,12 @@ pub trait SigningQueue: Send + Sync { | ||||
| 	fn is_empty(&self) -> bool; | ||||
| } | ||||
| 
 | ||||
| struct ConfirmationSender { | ||||
| /// Confirmation request information with result notifier.
 | ||||
| pub struct ConfirmationSender { | ||||
| 	/// Confirmation request information.
 | ||||
| 	pub request: ConfirmationRequest, | ||||
| 
 | ||||
| 	sender: oneshot::Sender<ConfirmationResult>, | ||||
| 	request: ConfirmationRequest, | ||||
| } | ||||
| 
 | ||||
| /// Receiving end of the Confirmation channel; can be used as a `Future` to await for `ConfirmationRequest`
 | ||||
| @ -122,27 +126,16 @@ impl ConfirmationsQueue { | ||||
| 	/// Notifies consumer that the communcation is over.
 | ||||
| 	/// No more events will be sent after this function is invoked.
 | ||||
| 	pub fn finish(&self) { | ||||
| 		self.notify(QueueEvent::Finish); | ||||
| 		self.notify_message(QueueEvent::Finish); | ||||
| 		self.on_event.write().clear(); | ||||
| 	} | ||||
| 
 | ||||
| 	/// Notifies receiver about the event happening in this queue.
 | ||||
| 	fn notify(&self, message: QueueEvent) { | ||||
| 		for listener in &*self.on_event.read() { | ||||
| 			listener(message.clone()) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	/// Removes requests from this queue and notifies `ConfirmationReceiver` holder about the result.
 | ||||
| 	/// Notifies also a receiver about that event.
 | ||||
| 	fn remove(&self, id: U256, result: Option<ConfirmationResult>) -> Option<ConfirmationRequest> { | ||||
| 		let sender = self.queue.write().remove(&id); | ||||
| 
 | ||||
| 		if let Some(sender) = sender { | ||||
| 	/// Notifies `ConfirmationReceiver` holder about the result given a request.
 | ||||
| 	fn notify_result(&self, sender: ConfirmationSender, result: Option<ConfirmationResult>) -> Option<ConfirmationRequest> { | ||||
| 		// notify receiver about the event
 | ||||
| 			self.notify(result.clone().map_or_else( | ||||
| 				|| QueueEvent::RequestRejected(id), | ||||
| 				|_| QueueEvent::RequestConfirmed(id) | ||||
| 		self.notify_message(result.clone().map_or_else( | ||||
| 			|| QueueEvent::RequestRejected(sender.request.id), | ||||
| 			|_| QueueEvent::RequestConfirmed(sender.request.id) | ||||
| 		)); | ||||
| 
 | ||||
| 		// notify confirmation receiver about resolution
 | ||||
| @ -150,8 +143,12 @@ impl ConfirmationsQueue { | ||||
| 		sender.sender.send(result); | ||||
| 
 | ||||
| 		Some(sender.request) | ||||
| 		} else { | ||||
| 			None | ||||
| 	} | ||||
| 
 | ||||
| 	/// Notifies receiver about the event happening in this queue.
 | ||||
| 	fn notify_message(&self, message: QueueEvent) { | ||||
| 		for listener in &*self.on_event.read() { | ||||
| 			listener(message.clone()) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| @ -193,22 +190,26 @@ impl SigningQueue for ConfirmationsQueue { | ||||
| 			(id, receiver) | ||||
| 		}; | ||||
| 		// Notify listeners
 | ||||
| 		self.notify(QueueEvent::NewRequest(id)); | ||||
| 		self.notify_message(QueueEvent::NewRequest(id)); | ||||
| 		Ok(res) | ||||
| 	} | ||||
| 
 | ||||
| 	fn peek(&self, id: &U256) -> Option<ConfirmationRequest> { | ||||
| 		self.queue.read().get(id).map(|sender| sender.request.clone()) | ||||
| 	fn take(&self, id: &U256) -> Option<ConfirmationSender> { | ||||
| 		self.queue.write().remove(id) | ||||
| 	} | ||||
| 
 | ||||
| 	fn request_rejected(&self, id: U256) -> Option<ConfirmationRequest> { | ||||
| 		debug!(target: "own_tx", "Signer: Request rejected ({:?}).", id); | ||||
| 		self.remove(id, None) | ||||
| 	fn request_rejected(&self, sender: ConfirmationSender) -> Option<ConfirmationRequest> { | ||||
| 		debug!(target: "own_tx", "Signer: Request rejected ({:?}).", sender.request.id); | ||||
| 		self.notify_result(sender, None) | ||||
| 	} | ||||
| 
 | ||||
| 	fn request_confirmed(&self, id: U256, result: ConfirmationResult) -> Option<ConfirmationRequest> { | ||||
| 		debug!(target: "own_tx", "Signer: Transaction confirmed ({:?}).", id); | ||||
| 		self.remove(id, Some(result)) | ||||
| 	fn request_confirmed(&self, sender: ConfirmationSender, result: ConfirmationResult) -> Option<ConfirmationRequest> { | ||||
| 		debug!(target: "own_tx", "Signer: Request confirmed ({:?}).", sender.request.id); | ||||
| 		self.notify_result(sender, Some(result)) | ||||
| 	} | ||||
| 
 | ||||
| 	fn request_untouched(&self, sender: ConfirmationSender) { | ||||
| 		self.queue.write().insert(sender.request.id, sender); | ||||
| 	} | ||||
| 
 | ||||
| 	fn requests(&self) -> Vec<ConfirmationRequest> { | ||||
| @ -261,7 +262,8 @@ mod test { | ||||
| 
 | ||||
| 		// when
 | ||||
| 		let (id, future) = queue.add_request(request, Default::default()).unwrap(); | ||||
| 		queue.request_confirmed(id, Ok(ConfirmationResponse::SendTransaction(1.into()))); | ||||
| 		let sender = queue.take(&id).unwrap(); | ||||
| 		queue.request_confirmed(sender, Ok(ConfirmationResponse::SendTransaction(1.into()))); | ||||
| 
 | ||||
| 		// then
 | ||||
| 		let confirmation = future.wait().unwrap(); | ||||
|  | ||||
| @ -92,11 +92,11 @@ impl<D: Dispatcher + 'static> SignerClient<D> { | ||||
| 		let dispatcher = self.dispatcher.clone(); | ||||
| 		let signer = self.signer.clone(); | ||||
| 
 | ||||
| 		Box::new(signer.peek(&id).map(|confirmation| { | ||||
| 			let mut payload = confirmation.payload.clone(); | ||||
| 		Box::new(signer.take(&id).map(|sender| { | ||||
| 			let mut payload = sender.request.payload.clone(); | ||||
| 			// Modify payload
 | ||||
| 			if let ConfirmationPayload::SendTransaction(ref mut request) = payload { | ||||
| 				if let Some(sender) = modification.sender.clone() { | ||||
| 				if let Some(sender) = modification.sender { | ||||
| 					request.from = sender.into(); | ||||
| 					// Altering sender should always reset the nonce.
 | ||||
| 					request.nonce = None; | ||||
| @ -115,7 +115,9 @@ impl<D: Dispatcher + 'static> SignerClient<D> { | ||||
| 			Either::A(fut.into_future().then(move |result| { | ||||
| 				// Execute
 | ||||
| 				if let Ok(ref response) = result { | ||||
| 					signer.request_confirmed(id, Ok((*response).clone())); | ||||
| 					signer.request_confirmed(sender, Ok((*response).clone())); | ||||
| 				} else { | ||||
| 					signer.request_untouched(sender); | ||||
| 				} | ||||
| 
 | ||||
| 				result | ||||
| @ -194,8 +196,9 @@ impl<D: Dispatcher + 'static> Signer for SignerClient<D> { | ||||
| 	fn confirm_request_raw(&self, id: U256, bytes: Bytes) -> Result<ConfirmationResponse> { | ||||
| 		let id = id.into(); | ||||
| 
 | ||||
| 		self.signer.peek(&id).map(|confirmation| { | ||||
| 			let result = match confirmation.payload { | ||||
| 		self.signer.take(&id).map(|sender| { | ||||
| 			let payload = sender.request.payload.clone(); | ||||
| 			let result = match payload { | ||||
| 				ConfirmationPayload::SendTransaction(request) => { | ||||
| 					Self::verify_transaction(bytes, request, |pending_transaction| { | ||||
| 						self.dispatcher.dispatch_transaction(pending_transaction) | ||||
| @ -224,14 +227,16 @@ impl<D: Dispatcher + 'static> Signer for SignerClient<D> { | ||||
| 				}, | ||||
| 			}; | ||||
| 			if let Ok(ref response) = result { | ||||
| 				self.signer.request_confirmed(id, Ok(response.clone())); | ||||
| 				self.signer.request_confirmed(sender, Ok(response.clone())); | ||||
| 			} else { | ||||
| 				self.signer.request_untouched(sender); | ||||
| 			} | ||||
| 			result | ||||
| 		}).unwrap_or_else(|| Err(errors::invalid_params("Unknown RequestID", id))) | ||||
| 	} | ||||
| 
 | ||||
| 	fn reject_request(&self, id: U256) -> Result<bool> { | ||||
| 		let res = self.signer.request_rejected(id.into()); | ||||
| 		let res = self.signer.take(&id.into()).map(|sender| self.signer.request_rejected(sender)); | ||||
| 		Ok(res.is_some()) | ||||
| 	} | ||||
| 
 | ||||
|  | ||||
| @ -110,7 +110,8 @@ fn should_add_sign_to_queue() { | ||||
| 	::std::thread::spawn(move || loop { | ||||
| 		if signer.requests().len() == 1 { | ||||
| 			// respond
 | ||||
| 			signer.request_confirmed(1.into(), Ok(ConfirmationResponse::Signature(0.into()))); | ||||
| 			let sender = signer.take(&1.into()).unwrap(); | ||||
| 			signer.request_confirmed(sender, Ok(ConfirmationResponse::Signature(0.into()))); | ||||
| 			break
 | ||||
| 		} | ||||
| 		::std::thread::sleep(Duration::from_millis(100)) | ||||
| @ -188,7 +189,8 @@ fn should_check_status_of_request_when_its_resolved() { | ||||
| 		"id": 1 | ||||
| 	}"#;
 | ||||
| 	tester.io.handle_request_sync(&request).expect("Sent"); | ||||
| 	tester.signer.request_confirmed(1.into(), Ok(ConfirmationResponse::Signature(1.into()))); | ||||
| 	let sender = tester.signer.take(&1.into()).unwrap(); | ||||
| 	tester.signer.request_confirmed(sender, Ok(ConfirmationResponse::Signature(1.into()))); | ||||
| 
 | ||||
| 	// This is not ideal, but we need to give futures some time to be executed, and they need to run in a separate thread
 | ||||
| 	thread::sleep(Duration::from_millis(20)); | ||||
| @ -259,7 +261,8 @@ fn should_add_transaction_to_queue() { | ||||
| 	::std::thread::spawn(move || loop { | ||||
| 		if signer.requests().len() == 1 { | ||||
| 			// respond
 | ||||
| 			signer.request_confirmed(1.into(), Ok(ConfirmationResponse::SendTransaction(0.into()))); | ||||
| 			let sender = signer.take(&1.into()).unwrap(); | ||||
| 			signer.request_confirmed(sender, Ok(ConfirmationResponse::SendTransaction(0.into()))); | ||||
| 			break
 | ||||
| 		} | ||||
| 		::std::thread::sleep(Duration::from_millis(100)) | ||||
| @ -335,7 +338,8 @@ fn should_add_sign_transaction_to_the_queue() { | ||||
| 	::std::thread::spawn(move || loop { | ||||
| 		if signer.requests().len() == 1 { | ||||
| 			// respond
 | ||||
| 			signer.request_confirmed(1.into(), Ok(ConfirmationResponse::SignTransaction( | ||||
| 			let sender = signer.take(&1.into()).unwrap(); | ||||
| 			signer.request_confirmed(sender, Ok(ConfirmationResponse::SignTransaction( | ||||
| 				RichRawTransaction::from_signed(t.into(), 0x0, u64::max_value()) | ||||
| 			))); | ||||
| 			break
 | ||||
| @ -442,7 +446,8 @@ fn should_add_decryption_to_the_queue() { | ||||
| 	::std::thread::spawn(move || loop { | ||||
| 		if signer.requests().len() == 1 { | ||||
| 			// respond
 | ||||
| 			signer.request_confirmed(1.into(), Ok(ConfirmationResponse::Decrypt(vec![0x1, 0x2].into()))); | ||||
| 			let sender = signer.take(&1.into()).unwrap(); | ||||
| 			signer.request_confirmed(sender, Ok(ConfirmationResponse::Decrypt(vec![0x1, 0x2].into()))); | ||||
| 			break
 | ||||
| 		} | ||||
| 		::std::thread::sleep(Duration::from_millis(10)) | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user