From 1073d56245b2087f8b6a0cd605c7572eaea22a07 Mon Sep 17 00:00:00 2001 From: Anton Gavrilov Date: Wed, 29 Aug 2018 14:31:04 +0200 Subject: [PATCH] Private packets verification and queue refactoring (#8715) * Verify private transaction before propagating * Private transactions queue reworked with tx pool queue direct usage * Styling fixed * Prevent resending private packets to the sender * Process signed private transaction packets via io queue * Test fixed * Build and test fixed after merge * Comments after review fixed * Signed transaction taken from verified * Fix after merge * Pool scoring generalized in order to use externally * Lib refactored according to the review comments * Ready state refactored * Redundant bound and copying removed * Fixed build after the merge * Forgotten case reworked * Review comments fixed * Logging reworked, target added * Fix after merge --- Cargo.lock | 3 + ethcore/private-tx/Cargo.toml | 2 + ethcore/private-tx/src/encryptor.rs | 2 +- ethcore/private-tx/src/error.rs | 2 + ethcore/private-tx/src/lib.rs | 338 +++++++++--------- ethcore/private-tx/src/messages.rs | 49 ++- .../private-tx/src/private_transactions.rs | 224 +++++++----- ethcore/service/Cargo.toml | 1 + ethcore/service/src/lib.rs | 1 + ethcore/service/src/service.rs | 21 +- ethcore/src/client/chain_notify.rs | 4 +- ethcore/src/test_helpers.rs | 4 +- ethcore/sync/src/api.rs | 9 +- ethcore/sync/src/chain/handler.rs | 33 +- ethcore/sync/src/chain/mod.rs | 37 +- ethcore/sync/src/chain/propagator.rs | 21 +- ethcore/sync/src/private_tx.rs | 23 +- ethcore/sync/src/tests/helpers.rs | 8 +- ethcore/sync/src/tests/private.rs | 5 +- miner/src/pool/local_transactions.rs | 2 +- miner/src/pool/mod.rs | 37 +- miner/src/pool/scoring.rs | 64 ++-- 22 files changed, 525 insertions(+), 365 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cf2abdd3e..88cebe3a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -741,6 +741,7 @@ dependencies = [ "ethkey 0.3.0", "fetch 0.1.0", "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", + "heapsize 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "keccak-hash 0.1.2 (git+https://github.com/paritytech/parity-common)", "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "parity-bytes 0.1.0 (git+https://github.com/paritytech/parity-common)", @@ -756,6 +757,7 @@ dependencies = [ "serde_derive 1.0.37 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", "tiny-keccak 1.4.2 (registry+https://github.com/rust-lang/crates.io-index)", + "transaction-pool 1.13.1", "url 1.5.1 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -808,6 +810,7 @@ dependencies = [ "ethcore-io 1.12.0", "ethcore-private-tx 1.0.0", "ethcore-sync 1.12.0", + "ethereum-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "kvdb 0.1.0 (git+https://github.com/paritytech/parity-common)", "kvdb-rocksdb 0.1.0 (git+https://github.com/paritytech/parity-common)", "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/ethcore/private-tx/Cargo.toml b/ethcore/private-tx/Cargo.toml index 2383443e7..cebf388f1 100644 --- a/ethcore/private-tx/Cargo.toml +++ b/ethcore/private-tx/Cargo.toml @@ -22,6 +22,7 @@ ethjson = { path = "../../json" } ethkey = { path = "../../ethkey" } fetch = { path = "../../util/fetch" } futures = "0.1" +heapsize = "0.4" keccak-hash = { git = "https://github.com/paritytech/parity-common" } log = "0.4" parking_lot = "0.6" @@ -35,6 +36,7 @@ serde = "1.0" serde_derive = "1.0" serde_json = "1.0" tiny-keccak = "1.4" +transaction-pool = { path = "../../transaction-pool" } url = "1" [dev-dependencies] diff --git a/ethcore/private-tx/src/encryptor.rs b/ethcore/private-tx/src/encryptor.rs index e64917add..c1d3d3fb8 100644 --- a/ethcore/private-tx/src/encryptor.rs +++ b/ethcore/private-tx/src/encryptor.rs @@ -208,7 +208,7 @@ impl Encryptor for SecretStoreEncryptor { let key = match self.retrieve_key("", false, contract_address, &*accounts) { Ok(key) => Ok(key), Err(Error(ErrorKind::EncryptionKeyNotFound(_), _)) => { - trace!("Key for account wasnt found in sstore. Creating. Address: {:?}", contract_address); + trace!(target: "privatetx", "Key for account wasnt found in sstore. Creating. Address: {:?}", contract_address); self.retrieve_key(&format!("/{}", self.config.threshold), true, contract_address, &*accounts) } Err(err) => Err(err), diff --git a/ethcore/private-tx/src/error.rs b/ethcore/private-tx/src/error.rs index 55a75d6d9..99da149e4 100644 --- a/ethcore/private-tx/src/error.rs +++ b/ethcore/private-tx/src/error.rs @@ -21,12 +21,14 @@ use ethcore::account_provider::SignError; use ethcore::error::{Error as EthcoreError, ExecutionError}; use transaction::Error as TransactionError; use ethkey::Error as KeyError; +use txpool::Error as TxPoolError; error_chain! { foreign_links { Io(::std::io::Error) #[doc = "Error concerning the Rust standard library's IO subsystem."]; Decoder(DecoderError) #[doc = "RLP decoding error."]; Trie(TrieError) #[doc = "Error concerning TrieDBs."]; + Txpool(TxPoolError) #[doc = "Tx pool error."]; } errors { diff --git a/ethcore/private-tx/src/lib.rs b/ethcore/private-tx/src/lib.rs index a661197da..24727fe0f 100644 --- a/ethcore/private-tx/src/lib.rs +++ b/ethcore/private-tx/src/lib.rs @@ -37,9 +37,11 @@ extern crate ethkey; extern crate ethjson; extern crate fetch; extern crate futures; +extern crate heapsize; extern crate keccak_hash as hash; extern crate parking_lot; extern crate patricia_trie as trie; +extern crate transaction_pool as txpool; extern crate patricia_trie_ethereum as ethtrie; extern crate rlp; extern crate url; @@ -61,7 +63,7 @@ extern crate rand; extern crate ethcore_logger; pub use encryptor::{Encryptor, SecretStoreEncryptor, EncryptorConfig, NoopEncryptor}; -pub use private_transactions::{PrivateTransactionDesc, VerificationStore, PrivateTransactionSigningDesc, SigningStore}; +pub use private_transactions::{VerifiedPrivateTransaction, VerificationStore, PrivateTransactionSigningDesc, SigningStore}; pub use messages::{PrivateTransaction, SignedPrivateTransaction}; pub use error::{Error, ErrorKind}; @@ -71,7 +73,7 @@ use std::time::Duration; use ethereum_types::{H128, H256, U256, Address}; use hash::keccak; use rlp::*; -use parking_lot::{Mutex, RwLock}; +use parking_lot::RwLock; use bytes::Bytes; use ethkey::{Signature, recover, public_to_address}; use io::IoChannel; @@ -128,9 +130,8 @@ pub struct Provider { signer_account: Option
, passwords: Vec, notify: RwLock>>, - transactions_for_signing: Mutex, - // TODO [ToDr] Move the Mutex/RwLock inside `VerificationStore` after refactored to `drain`. - transactions_for_verification: Mutex, + transactions_for_signing: RwLock, + transactions_for_verification: VerificationStore, client: Arc, miner: Arc, accounts: Arc, @@ -161,8 +162,8 @@ impl Provider where { signer_account: config.signer_account, passwords: config.passwords, notify: RwLock::default(), - transactions_for_signing: Mutex::default(), - transactions_for_verification: Mutex::default(), + transactions_for_signing: RwLock::default(), + transactions_for_verification: VerificationStore::default(), client, miner, accounts, @@ -190,9 +191,9 @@ impl Provider where { /// 3. Save it with state returned on prev step to the queue for signing /// 4. Broadcast corresponding message to the chain pub fn create_private_transaction(&self, signed_transaction: SignedTransaction) -> Result { - trace!("Creating private transaction from regular transaction: {:?}", signed_transaction); + trace!(target: "privatetx", "Creating private transaction from regular transaction: {:?}", signed_transaction); if self.signer_account.is_none() { - trace!("Signing account not set"); + warn!(target: "privatetx", "Signing account not set"); bail!(ErrorKind::SignerAccountNotSet); } let tx_hash = signed_transaction.hash(); @@ -203,10 +204,7 @@ impl Provider where { Action::Call(contract) => { let data = signed_transaction.rlp_bytes(); let encrypted_transaction = self.encrypt(&contract, &Self::iv_from_transaction(&signed_transaction), &data)?; - let private = PrivateTransaction { - encrypted: encrypted_transaction, - contract, - }; + let private = PrivateTransaction::new(encrypted_transaction, contract); // TODO [ToDr] Using BlockId::Latest is bad here, // the block may change in the middle of execution // causing really weird stuff to happen. @@ -215,16 +213,16 @@ impl Provider where { // in private-tx to avoid such mistakes. let contract_nonce = self.get_contract_nonce(&contract, BlockId::Latest)?; let private_state = self.execute_private_transaction(BlockId::Latest, &signed_transaction)?; - trace!("Private transaction created, encrypted transaction: {:?}, private state: {:?}", private, private_state); + trace!(target: "privatetx", "Private transaction created, encrypted transaction: {:?}, private state: {:?}", private, private_state); let contract_validators = self.get_validators(BlockId::Latest, &contract)?; - trace!("Required validators: {:?}", contract_validators); + trace!(target: "privatetx", "Required validators: {:?}", contract_validators); let private_state_hash = self.calculate_state_hash(&private_state, contract_nonce); - trace!("Hashed effective private state for sender: {:?}", private_state_hash); - self.transactions_for_signing.lock().add_transaction(private.hash(), signed_transaction, contract_validators, private_state, contract_nonce)?; - self.broadcast_private_transaction(private.rlp_bytes().into_vec()); + trace!(target: "privatetx", "Hashed effective private state for sender: {:?}", private_state_hash); + self.transactions_for_signing.write().add_transaction(private.hash(), signed_transaction, contract_validators, private_state, contract_nonce)?; + self.broadcast_private_transaction(private.hash(), private.rlp_bytes().into_vec()); Ok(Receipt { hash: tx_hash, - contract_address: None, + contract_address: Some(contract), status_code: 0, }) } @@ -240,14 +238,6 @@ impl Provider where { keccak(&state_buf.as_ref()) } - /// Extract signed transaction from private transaction - fn extract_original_transaction(&self, private: PrivateTransaction, contract: &Address) -> Result { - let encrypted_transaction = private.encrypted; - let transaction_bytes = self.decrypt(contract, &encrypted_transaction)?; - let original_transaction: UnverifiedTransaction = Rlp::new(&transaction_bytes).as_val()?; - Ok(original_transaction) - } - fn pool_client<'a>(&'a self, nonce_cache: &'a NonceCache) -> miner::pool_client::PoolClient<'a, Client> { let engine = self.client.engine(); let refuse_service_transactions = true; @@ -261,48 +251,122 @@ impl Provider where { } /// Retrieve and verify the first available private transaction for every sender - /// - /// TODO [ToDr] It seems that: - /// The 3 methods `ready_transaction,get_descriptor,remove` are always used in conjuction so most likely - /// can be replaced with a single `drain()` method instead. - /// Thanks to this we also don't really need to lock the entire verification for the time of execution. - fn process_queue(&self) -> Result<(), Error> { + fn process_verification_queue(&self) -> Result<(), Error> { let nonce_cache = NonceCache::new(NONCE_CACHE_SIZE); - let mut verification_queue = self.transactions_for_verification.lock(); - let ready_transactions = verification_queue.ready_transactions(self.pool_client(&nonce_cache)); - for transaction in ready_transactions { - let transaction_hash = transaction.signed().hash(); - match verification_queue.private_transaction_descriptor(&transaction_hash) { - Ok(desc) => { - if !self.validator_accounts.contains(&desc.validator_account) { - trace!("Cannot find validator account in config"); - bail!(ErrorKind::ValidatorAccountNotSet); + let process_transaction = |transaction: &VerifiedPrivateTransaction| -> Result<_, String> { + let private_hash = transaction.private_transaction.hash(); + match transaction.validator_account { + None => { + trace!(target: "privatetx", "Propagating transaction further"); + self.broadcast_private_transaction(private_hash, transaction.private_transaction.rlp_bytes().into_vec()); + return Ok(()); + } + Some(validator_account) => { + if !self.validator_accounts.contains(&validator_account) { + trace!(target: "privatetx", "Propagating transaction further"); + self.broadcast_private_transaction(private_hash, transaction.private_transaction.rlp_bytes().into_vec()); + return Ok(()); } - let account = desc.validator_account; - if let Action::Call(contract) = transaction.signed().action { + let tx_action = transaction.transaction.action.clone(); + if let Action::Call(contract) = tx_action { // TODO [ToDr] Usage of BlockId::Latest - let contract_nonce = self.get_contract_nonce(&contract, BlockId::Latest)?; - let private_state = self.execute_private_transaction(BlockId::Latest, transaction.signed())?; + let contract_nonce = self.get_contract_nonce(&contract, BlockId::Latest); + if let Err(e) = contract_nonce { + bail!("Cannot retrieve contract nonce: {:?}", e); + } + let contract_nonce = contract_nonce.expect("Error was checked before"); + let private_state = self.execute_private_transaction(BlockId::Latest, &transaction.transaction); + if let Err(e) = private_state { + bail!("Cannot retrieve private state: {:?}", e); + } + let private_state = private_state.expect("Error was checked before"); let private_state_hash = self.calculate_state_hash(&private_state, contract_nonce); - trace!("Hashed effective private state for validator: {:?}", private_state_hash); - let password = find_account_password(&self.passwords, &*self.accounts, &account); - let signed_state = self.accounts.sign(account, password, private_state_hash)?; - let signed_private_transaction = SignedPrivateTransaction::new(desc.private_hash, signed_state, None); - trace!("Sending signature for private transaction: {:?}", signed_private_transaction); - self.broadcast_signed_private_transaction(signed_private_transaction.rlp_bytes().into_vec()); + trace!(target: "privatetx", "Hashed effective private state for validator: {:?}", private_state_hash); + let password = find_account_password(&self.passwords, &*self.accounts, &validator_account); + let signed_state = self.accounts.sign(validator_account, password, private_state_hash); + if let Err(e) = signed_state { + bail!("Cannot sign the state: {:?}", e); + } + let signed_state = signed_state.expect("Error was checked before"); + let signed_private_transaction = SignedPrivateTransaction::new(private_hash, signed_state, None); + trace!(target: "privatetx", "Sending signature for private transaction: {:?}", signed_private_transaction); + self.broadcast_signed_private_transaction(signed_private_transaction.hash(), signed_private_transaction.rlp_bytes().into_vec()); } else { - warn!("Incorrect type of action for the transaction"); + bail!("Incorrect type of action for the transaction"); } - }, - Err(e) => { - warn!("Cannot retrieve descriptor for transaction with error {:?}", e); } } - verification_queue.remove_private_transaction(&transaction_hash); + Ok(()) + }; + let ready_transactions = self.transactions_for_verification.drain(self.pool_client(&nonce_cache)); + for transaction in ready_transactions { + if let Err(e) = process_transaction(&transaction) { + warn!(target: "privatetx", "Error: {:?}", e); + } } Ok(()) } + /// Add signed private transaction into the store + /// Creates corresponding public transaction if last required signature collected and sends it to the chain + pub fn process_signature(&self, signed_tx: &SignedPrivateTransaction) -> Result<(), Error> { + trace!(target: "privatetx", "Processing signed private transaction"); + let private_hash = signed_tx.private_transaction_hash(); + let desc = match self.transactions_for_signing.read().get(&private_hash) { + None => { + // Not our transaction, broadcast further to peers + self.broadcast_signed_private_transaction(signed_tx.hash(), signed_tx.rlp_bytes().into_vec()); + return Ok(()); + }, + Some(desc) => desc, + }; + let last = self.last_required_signature(&desc, signed_tx.signature())?; + + if last { + let mut signatures = desc.received_signatures.clone(); + signatures.push(signed_tx.signature()); + let rsv: Vec = signatures.into_iter().map(|sign| sign.into_electrum().into()).collect(); + //Create public transaction + let public_tx = self.public_transaction( + desc.state.clone(), + &desc.original_transaction, + &rsv, + desc.original_transaction.nonce, + desc.original_transaction.gas_price + )?; + trace!(target: "privatetx", "Last required signature received, public transaction created: {:?}", public_tx); + //Sign and add it to the queue + let chain_id = desc.original_transaction.chain_id(); + let hash = public_tx.hash(chain_id); + let signer_account = self.signer_account.ok_or_else(|| ErrorKind::SignerAccountNotSet)?; + let password = find_account_password(&self.passwords, &*self.accounts, &signer_account); + let signature = self.accounts.sign(signer_account, password, hash)?; + let signed = SignedTransaction::new(public_tx.with_signature(signature, chain_id))?; + match self.miner.import_own_transaction(&*self.client, signed.into()) { + Ok(_) => trace!(target: "privatetx", "Public transaction added to queue"), + Err(err) => { + warn!(target: "privatetx", "Failed to add transaction to queue, error: {:?}", err); + bail!(err); + } + } + //Remove from store for signing + if let Err(err) = self.transactions_for_signing.write().remove(&private_hash) { + warn!(target: "privatetx", "Failed to remove transaction from signing store, error: {:?}", err); + bail!(err); + } + } else { + //Add signature to the store + match self.transactions_for_signing.write().add_signature(&private_hash, signed_tx.signature()) { + Ok(_) => trace!(target: "privatetx", "Signature stored for private transaction"), + Err(err) => { + warn!(target: "privatetx", "Failed to add signature to signing store, error: {:?}", err); + bail!(err); + } + } + } + Ok(()) + } + fn last_required_signature(&self, desc: &PrivateTransactionSigningDesc, sign: Signature) -> Result { if desc.received_signatures.contains(&sign) { return Ok(false); @@ -316,26 +380,26 @@ impl Provider where { Ok(desc.received_signatures.len() + 1 == desc.validators.len()) } false => { - trace!("Sender's state doesn't correspond to validator's"); + warn!(target: "privatetx", "Sender's state doesn't correspond to validator's"); bail!(ErrorKind::StateIncorrect); } } } Err(err) => { - trace!("Sender's state doesn't correspond to validator's, error {:?}", err); + warn!(target: "privatetx", "Sender's state doesn't correspond to validator's, error {:?}", err); bail!(err); } } } /// Broadcast the private transaction message to the chain - fn broadcast_private_transaction(&self, message: Bytes) { - self.notify(|notify| notify.broadcast(ChainMessageType::PrivateTransaction(message.clone()))); + fn broadcast_private_transaction(&self, transaction_hash: H256, message: Bytes) { + self.notify(|notify| notify.broadcast(ChainMessageType::PrivateTransaction(transaction_hash, message.clone()))); } /// Broadcast signed private transaction message to the chain - fn broadcast_signed_private_transaction(&self, message: Bytes) { - self.notify(|notify| notify.broadcast(ChainMessageType::SignedPrivateTransaction(message.clone()))); + fn broadcast_signed_private_transaction(&self, transaction_hash: H256, message: Bytes) { + self.notify(|notify| notify.broadcast(ChainMessageType::SignedPrivateTransaction(transaction_hash, message.clone()))); } fn iv_from_transaction(transaction: &SignedTransaction) -> H128 { @@ -351,12 +415,12 @@ impl Provider where { } fn encrypt(&self, contract_address: &Address, initialisation_vector: &H128, data: &[u8]) -> Result { - trace!("Encrypt data using key(address): {:?}", contract_address); + trace!(target: "privatetx", "Encrypt data using key(address): {:?}", contract_address); Ok(self.encryptor.encrypt(contract_address, &*self.accounts, initialisation_vector, data)?) } fn decrypt(&self, contract_address: &Address, data: &[u8]) -> Result { - trace!("Decrypt data using key(address): {:?}", contract_address); + trace!(target: "privatetx", "Decrypt data using key(address): {:?}", contract_address); Ok(self.encryptor.decrypt(contract_address, &*self.accounts, data)?) } @@ -421,7 +485,7 @@ impl Provider where { Action::Call(ref contract_address) => { let contract_code = Arc::new(self.get_decrypted_code(contract_address, block)?); let contract_state = self.get_decrypted_state(contract_address, block)?; - trace!("Patching contract at {:?}, code: {:?}, state: {:?}", contract_address, contract_code, contract_state); + trace!(target: "privatetx", "Patching contract at {:?}, code: {:?}, state: {:?}", contract_address, contract_code, contract_state); state.patch_account(contract_address, contract_code, Self::snapshot_to_storage(contract_state))?; Some(*contract_address) }, @@ -449,7 +513,7 @@ impl Provider where { (enc_code, self.encrypt(&address, &Self::iv_from_transaction(transaction), &Self::snapshot_from_storage(&storage))?) }, }; - trace!("Private contract executed. code: {:?}, state: {:?}, result: {:?}", encrypted_code, encrypted_storage, result.output); + trace!(target: "privatetx", "Private contract executed. code: {:?}, state: {:?}, result: {:?}", encrypted_code, encrypted_storage, result.output); Ok(PrivateExecutionResult { code: encrypted_code, state: encrypted_storage, @@ -550,12 +614,12 @@ impl Provider where { pub trait Importer { /// Process received private transaction - fn import_private_transaction(&self, _rlp: &[u8]) -> Result<(), Error>; + fn import_private_transaction(&self, _rlp: &[u8]) -> Result; /// Add signed private transaction into the store /// /// Creates corresponding public transaction if last required signature collected and sends it to the chain - fn import_signed_private_transaction(&self, _rlp: &[u8]) -> Result<(), Error>; + fn import_signed_private_transaction(&self, _rlp: &[u8]) -> Result; } // TODO [ToDr] Offload more heavy stuff to the IoService thread. @@ -564,115 +628,59 @@ pub trait Importer { // for both verification and execution. impl Importer for Arc { - fn import_private_transaction(&self, rlp: &[u8]) -> Result<(), Error> { - trace!("Private transaction received"); + fn import_private_transaction(&self, rlp: &[u8]) -> Result { + trace!(target: "privatetx", "Private transaction received"); let private_tx: PrivateTransaction = Rlp::new(rlp).as_val()?; - let contract = private_tx.contract; + let private_tx_hash = private_tx.hash(); + let contract = private_tx.contract(); let contract_validators = self.get_validators(BlockId::Latest, &contract)?; let validation_account = contract_validators .iter() .find(|address| self.validator_accounts.contains(address)); - match validation_account { - None => { - // TODO [ToDr] This still seems a bit invalid, imho we should still import the transaction to the pool. - // Importing to pool verifies correctness and nonce; here we are just blindly forwarding. - // - // Not for verification, broadcast further to peers - self.broadcast_private_transaction(rlp.into()); - return Ok(()); - }, - Some(&validation_account) => { - let hash = private_tx.hash(); - trace!("Private transaction taken for verification"); - let original_tx = self.extract_original_transaction(private_tx, &contract)?; - trace!("Validating transaction: {:?}", original_tx); - // Verify with the first account available - trace!("The following account will be used for verification: {:?}", validation_account); - let nonce_cache = NonceCache::new(NONCE_CACHE_SIZE); - self.transactions_for_verification.lock().add_transaction( - original_tx, - contract, - validation_account, - hash, - self.pool_client(&nonce_cache), - )?; - let provider = Arc::downgrade(self); - self.channel.send(ClientIoMessage::execute(move |_| { - if let Some(provider) = provider.upgrade() { - if let Err(e) = provider.process_queue() { - debug!("Unable to process the queue: {}", e); - } - } - })).map_err(|_| ErrorKind::ClientIsMalformed.into()) + //extract the original transaction + let encrypted_data = private_tx.encrypted(); + let transaction_bytes = self.decrypt(&contract, &encrypted_data)?; + let original_tx: UnverifiedTransaction = Rlp::new(&transaction_bytes).as_val()?; + let nonce_cache = NonceCache::new(NONCE_CACHE_SIZE); + //add to the queue for further verification + self.transactions_for_verification.add_transaction( + original_tx, + validation_account.map(|&account| account), + private_tx, + self.pool_client(&nonce_cache), + )?; + let provider = Arc::downgrade(self); + let result = self.channel.send(ClientIoMessage::execute(move |_| { + if let Some(provider) = provider.upgrade() { + if let Err(e) = provider.process_verification_queue() { + warn!(target: "privatetx", "Unable to process the queue: {}", e); + } } + })); + if let Err(e) = result { + warn!(target: "privatetx", "Error sending NewPrivateTransaction message: {:?}", e); } + Ok(private_tx_hash) } - fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result<(), Error> { + fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result { let tx: SignedPrivateTransaction = Rlp::new(rlp).as_val()?; - trace!("Signature for private transaction received: {:?}", tx); + trace!(target: "privatetx", "Signature for private transaction received: {:?}", tx); let private_hash = tx.private_transaction_hash(); - let desc = match self.transactions_for_signing.lock().get(&private_hash) { - None => { - // TODO [ToDr] Verification (we can't just blindly forward every transaction) - - // Not our transaction, broadcast further to peers - self.broadcast_signed_private_transaction(rlp.into()); - return Ok(()); - }, - Some(desc) => desc, - }; - - let last = self.last_required_signature(&desc, tx.signature())?; - - if last { - let mut signatures = desc.received_signatures.clone(); - signatures.push(tx.signature()); - let rsv: Vec = signatures.into_iter().map(|sign| sign.into_electrum().into()).collect(); - //Create public transaction - let public_tx = self.public_transaction( - desc.state.clone(), - &desc.original_transaction, - &rsv, - desc.original_transaction.nonce, - desc.original_transaction.gas_price - )?; - trace!("Last required signature received, public transaction created: {:?}", public_tx); - //Sign and add it to the queue - let chain_id = desc.original_transaction.chain_id(); - let hash = public_tx.hash(chain_id); - let signer_account = self.signer_account.ok_or_else(|| ErrorKind::SignerAccountNotSet)?; - let password = find_account_password(&self.passwords, &*self.accounts, &signer_account); - let signature = self.accounts.sign(signer_account, password, hash)?; - let signed = SignedTransaction::new(public_tx.with_signature(signature, chain_id))?; - match self.miner.import_own_transaction(&*self.client, signed.into()) { - Ok(_) => trace!("Public transaction added to queue"), - Err(err) => { - trace!("Failed to add transaction to queue, error: {:?}", err); - bail!(err); - } - } - //Remove from store for signing - match self.transactions_for_signing.lock().remove(&private_hash) { - Ok(_) => {} - Err(err) => { - trace!("Failed to remove transaction from signing store, error: {:?}", err); - bail!(err); - } - } - } else { - //Add signature to the store - match self.transactions_for_signing.lock().add_signature(&private_hash, tx.signature()) { - Ok(_) => trace!("Signature stored for private transaction"), - Err(err) => { - trace!("Failed to add signature to signing store, error: {:?}", err); - bail!(err); + let provider = Arc::downgrade(self); + let result = self.channel.send(ClientIoMessage::execute(move |_| { + if let Some(provider) = provider.upgrade() { + if let Err(e) = provider.process_signature(&tx) { + warn!(target: "privatetx", "Unable to process the signature: {}", e); } } + })); + if let Err(e) = result { + warn!(target: "privatetx", "Error sending NewSignedPrivateTransaction message: {:?}", e); } - Ok(()) + Ok(private_hash) } } @@ -689,9 +697,9 @@ fn find_account_password(passwords: &Vec, account_provider: &AccountPr impl ChainNotify for Provider { fn new_blocks(&self, imported: Vec, _invalid: Vec, _route: ChainRoute, _sealed: Vec, _proposed: Vec, _duration: Duration) { if !imported.is_empty() { - trace!("New blocks imported, try to prune the queue"); - if let Err(err) = self.process_queue() { - trace!("Cannot prune private transactions queue. error: {:?}", err); + trace!(target: "privatetx", "New blocks imported, try to prune the queue"); + if let Err(err) = self.process_verification_queue() { + warn!(target: "privatetx", "Cannot prune private transactions queue. error: {:?}", err); } } } diff --git a/ethcore/private-tx/src/messages.rs b/ethcore/private-tx/src/messages.rs index 57362e7ce..c0825fb59 100644 --- a/ethcore/private-tx/src/messages.rs +++ b/ethcore/private-tx/src/messages.rs @@ -25,15 +25,41 @@ use transaction::signature::{add_chain_replay_protection, check_replay_protectio #[derive(Default, Debug, Clone, PartialEq, RlpEncodable, RlpDecodable, Eq)] pub struct PrivateTransaction { /// Encrypted data - pub encrypted: Bytes, + encrypted: Bytes, /// Address of the contract - pub contract: Address, + contract: Address, + /// Hash + hash: H256, } impl PrivateTransaction { - /// Compute hash on private transaction + /// Constructor + pub fn new(encrypted: Bytes, contract: Address) -> Self { + PrivateTransaction { + encrypted, + contract, + hash: 0.into(), + }.compute_hash() + } + + fn compute_hash(mut self) -> PrivateTransaction { + self.hash = keccak(&*self.rlp_bytes()); + self + } + + /// Hash of the private transaction pub fn hash(&self) -> H256 { - keccak(&*self.rlp_bytes()) + self.hash + } + + /// Address of the contract + pub fn contract(&self) -> Address { + self.contract + } + + /// Encrypted data + pub fn encrypted(&self) -> Bytes { + self.encrypted.clone() } } @@ -49,6 +75,8 @@ pub struct SignedPrivateTransaction { r: U256, /// The S field of the signature s: U256, + /// Hash + hash: H256, } impl SignedPrivateTransaction { @@ -59,7 +87,13 @@ impl SignedPrivateTransaction { r: sig.r().into(), s: sig.s().into(), v: add_chain_replay_protection(sig.v() as u64, chain_id), - } + hash: 0.into(), + }.compute_hash() + } + + fn compute_hash(mut self) -> SignedPrivateTransaction { + self.hash = keccak(&*self.rlp_bytes()); + self } pub fn standard_v(&self) -> u8 { check_replay_protection(self.v) } @@ -73,4 +107,9 @@ impl SignedPrivateTransaction { pub fn private_transaction_hash(&self) -> H256 { self.private_transaction_hash } + + /// Own hash + pub fn hash(&self) -> H256 { + self.hash + } } diff --git a/ethcore/private-tx/src/private_transactions.rs b/ethcore/private-tx/src/private_transactions.rs index e16d6ab91..a0f58f9ca 100644 --- a/ethcore/private-tx/src/private_transactions.rs +++ b/ethcore/private-tx/src/private_transactions.rs @@ -15,62 +15,139 @@ // along with Parity. If not, see . use std::sync::Arc; +use std::cmp; use std::collections::{HashMap, HashSet}; use bytes::Bytes; use ethcore_miner::pool; use ethereum_types::{H256, U256, Address}; +use heapsize::HeapSizeOf; use ethkey::Signature; +use messages::PrivateTransaction; +use parking_lot::RwLock; use transaction::{UnverifiedTransaction, SignedTransaction}; - +use txpool; +use txpool::{VerifiedTransaction, Verifier}; use error::{Error, ErrorKind}; +type Pool = txpool::Pool; + /// Maximum length for private transactions queues. const MAX_QUEUE_LEN: usize = 8312; -/// Desriptor for private transaction stored in queue for verification -#[derive(Default, Debug, Clone, PartialEq, Eq)] -pub struct PrivateTransactionDesc { - /// Hash of the private transaction - pub private_hash: H256, - /// Contract's address used in private transaction - pub contract: Address, +/// Private transaction stored in queue for verification +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct VerifiedPrivateTransaction { + /// Original private transaction + pub private_transaction: PrivateTransaction, /// Address that should be used for verification - pub validator_account: Address, + pub validator_account: Option
, + /// Resulting verified transaction + pub transaction: SignedTransaction, + /// Original transaction hash + pub transaction_hash: H256, + /// Original transaction sender + pub transaction_sender: Address, +} + +impl txpool::VerifiedTransaction for VerifiedPrivateTransaction { + type Hash = H256; + type Sender = Address; + + fn hash(&self) -> &H256 { + &self.transaction_hash + } + + fn mem_usage(&self) -> usize { + self.transaction.heap_size_of_children() + } + + fn sender(&self) -> &Address { + &self.transaction_sender + } +} + +impl pool::ScoredTransaction for VerifiedPrivateTransaction { + fn priority(&self) -> pool::Priority { + pool::Priority::Regular + } + + /// Gets transaction gas price. + fn gas_price(&self) -> &U256 { + &self.transaction.gas_price + } + + /// Gets transaction nonce. + fn nonce(&self) -> U256 { + self.transaction.nonce + } +} + +/// Checks readiness of transactions by looking if the transaction from sender already exists. +/// Guarantees only one transaction per sender +#[derive(Debug)] +pub struct PrivateReadyState { + senders: HashSet
, + state: C, +} + +impl PrivateReadyState { + /// Create new State checker, given client interface. + pub fn new( + state: C, + ) -> Self { + PrivateReadyState { + senders: Default::default(), + state, + } + } +} + +impl txpool::Ready for PrivateReadyState { + fn is_ready(&mut self, tx: &VerifiedPrivateTransaction) -> txpool::Readiness { + let sender = tx.sender(); + let state = &self.state; + let state_nonce = state.account_nonce(sender); + if self.senders.contains(sender) { + txpool::Readiness::Future + } else { + self.senders.insert(*sender); + match tx.transaction.nonce.cmp(&state_nonce) { + cmp::Ordering::Greater => txpool::Readiness::Future, + cmp::Ordering::Less => txpool::Readiness::Stale, + cmp::Ordering::Equal => txpool::Readiness::Ready, + } + } + } } /// Storage for private transactions for verification pub struct VerificationStore { - /// Descriptors for private transactions in queue for verification with key - hash of the original transaction - descriptors: HashMap, - /// Queue with transactions for verification - /// - /// TODO [ToDr] Might actually be better to use `txpool` directly and: - /// 1. Store descriptors inside `VerifiedTransaction` - /// 2. Use custom `ready` implementation to only fetch one transaction per sender. - /// 3. Get rid of passing dummy `block_number` and `timestamp` - transactions: pool::TransactionQueue, + verification_pool: RwLock, + verification_options: pool::verifier::Options, } impl Default for VerificationStore { fn default() -> Self { VerificationStore { - descriptors: Default::default(), - transactions: pool::TransactionQueue::new( - pool::Options { - max_count: MAX_QUEUE_LEN, - max_per_sender: MAX_QUEUE_LEN / 10, - max_mem_usage: 8 * 1024 * 1024, - }, - pool::verifier::Options { - // TODO [ToDr] This should probably be based on some real values? - minimal_gas_price: 0.into(), - block_gas_limit: 8_000_000.into(), - tx_gas_limit: U256::max_value(), - no_early_reject: false - }, - pool::PrioritizationStrategy::GasPriceOnly, - ) + verification_pool: RwLock::new( + txpool::Pool::new( + txpool::NoopListener, + pool::scoring::NonceAndGasPrice(pool::PrioritizationStrategy::GasPriceOnly), + pool::Options { + max_count: MAX_QUEUE_LEN, + max_per_sender: MAX_QUEUE_LEN / 10, + max_mem_usage: 8 * 1024 * 1024, + }, + ) + ), + verification_options: pool::verifier::Options { + // TODO [ToDr] This should probably be based on some real values? + minimal_gas_price: 0.into(), + block_gas_limit: 8_000_000.into(), + tx_gas_limit: U256::max_value(), + no_early_reject: false, + }, } } } @@ -78,66 +155,43 @@ impl Default for VerificationStore { impl VerificationStore { /// Adds private transaction for verification into the store pub fn add_transaction( - &mut self, + &self, transaction: UnverifiedTransaction, - contract: Address, - validator_account: Address, - private_hash: H256, + validator_account: Option
, + private_transaction: PrivateTransaction, client: C, ) -> Result<(), Error> { - if self.descriptors.len() > MAX_QUEUE_LEN { - bail!(ErrorKind::QueueIsFull); - } - let transaction_hash = transaction.hash(); - if self.descriptors.get(&transaction_hash).is_some() { - bail!(ErrorKind::PrivateTransactionAlreadyImported); - } - - let results = self.transactions.import( - client, - vec![pool::verifier::Transaction::Unverified(transaction)], - ); - - // Verify that transaction was imported - results.into_iter() - .next() - .expect("One transaction inserted; one result returned; qed")?; - - self.descriptors.insert(transaction_hash, PrivateTransactionDesc { - private_hash, - contract, + let options = self.verification_options.clone(); + // Use pool's verifying pipeline for original transaction's verification + let verifier = pool::verifier::Verifier::new(client, options, Default::default(), None); + let unverified = pool::verifier::Transaction::Unverified(transaction); + let verified_tx = verifier.verify_transaction(unverified)?; + let signed_tx: SignedTransaction = verified_tx.signed().clone(); + let signed_hash = signed_tx.hash(); + let signed_sender = signed_tx.sender(); + let verified = VerifiedPrivateTransaction { + private_transaction, validator_account, - }); - + transaction: signed_tx, + transaction_hash: signed_hash, + transaction_sender: signed_sender, + }; + let mut pool = self.verification_pool.write(); + pool.import(verified)?; Ok(()) } - /// Returns transactions ready for verification + /// Drains transactions ready for verification from the pool /// Returns only one transaction per sender because several cannot be verified in a row without verification from other peers - pub fn ready_transactions(&self, client: C) -> Vec> { - // We never store PendingTransactions and we don't use internal cache, - // so we don't need to provide real block number of timestamp here - let block_number = 0; - let timestamp = 0; - let nonce_cap = None; - - self.transactions.collect_pending(client, block_number, timestamp, nonce_cap, |transactions| { - // take only one transaction per sender - let mut senders = HashSet::with_capacity(self.descriptors.len()); - transactions.filter(move |tx| senders.insert(tx.signed().sender())).collect() - }) - } - - /// Returns descriptor of the corresponding private transaction - pub fn private_transaction_descriptor(&self, transaction_hash: &H256) -> Result<&PrivateTransactionDesc, Error> { - self.descriptors.get(transaction_hash).ok_or(ErrorKind::PrivateTransactionNotFound.into()) - } - - /// Remove transaction from the queue for verification - pub fn remove_private_transaction(&mut self, transaction_hash: &H256) { - self.descriptors.remove(transaction_hash); - self.transactions.remove(&[*transaction_hash], true); + pub fn drain(&self, client: C) -> Vec> { + let ready = PrivateReadyState::new(client); + let transactions: Vec<_> = self.verification_pool.read().pending(ready).collect(); + let mut pool = self.verification_pool.write(); + for tx in &transactions { + pool.remove(tx.hash(), true); + } + transactions } } diff --git a/ethcore/service/Cargo.toml b/ethcore/service/Cargo.toml index e1ba1c81f..ce445f6d9 100644 --- a/ethcore/service/Cargo.toml +++ b/ethcore/service/Cargo.toml @@ -10,6 +10,7 @@ ethcore = { path = ".." } ethcore-io = { path = "../../util/io" } ethcore-private-tx = { path = "../private-tx" } ethcore-sync = { path = "../sync" } +ethereum-types = "0.3" kvdb = { git = "https://github.com/paritytech/parity-common" } log = "0.4" stop-guard = { path = "../../util/stop-guard" } diff --git a/ethcore/service/src/lib.rs b/ethcore/service/src/lib.rs index d85a377cd..7ded2af79 100644 --- a/ethcore/service/src/lib.rs +++ b/ethcore/service/src/lib.rs @@ -19,6 +19,7 @@ extern crate ethcore; extern crate ethcore_io as io; extern crate ethcore_private_tx; extern crate ethcore_sync as sync; +extern crate ethereum_types; extern crate kvdb; extern crate stop_guard; diff --git a/ethcore/service/src/service.rs b/ethcore/service/src/service.rs index 1ffb0d621..a6bbc4e10 100644 --- a/ethcore/service/src/service.rs +++ b/ethcore/service/src/service.rs @@ -21,6 +21,7 @@ use std::path::Path; use std::time::Duration; use ansi_term::Colour; +use ethereum_types::H256; use io::{IoContext, TimerToken, IoHandler, IoService, IoError}; use stop_guard::StopGuard; @@ -54,12 +55,24 @@ impl PrivateTxService { } impl PrivateTxHandler for PrivateTxService { - fn import_private_transaction(&self, rlp: &[u8]) -> Result<(), String> { - self.provider.import_private_transaction(rlp).map_err(|e| e.to_string()) + fn import_private_transaction(&self, rlp: &[u8]) -> Result { + match self.provider.import_private_transaction(rlp) { + Ok(import_result) => Ok(import_result), + Err(err) => { + warn!(target: "privatetx", "Unable to import private transaction packet: {}", err); + bail!(err.to_string()) + } + } } - fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result<(), String> { - self.provider.import_signed_private_transaction(rlp).map_err(|e| e.to_string()) + fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result { + match self.provider.import_signed_private_transaction(rlp) { + Ok(import_result) => Ok(import_result), + Err(err) => { + warn!(target: "privatetx", "Unable to import signed private transaction packet: {}", err); + bail!(err.to_string()) + } + } } } diff --git a/ethcore/src/client/chain_notify.rs b/ethcore/src/client/chain_notify.rs index 62de03591..ebfe7bdef 100644 --- a/ethcore/src/client/chain_notify.rs +++ b/ethcore/src/client/chain_notify.rs @@ -26,9 +26,9 @@ pub enum ChainMessageType { /// Consensus message Consensus(Vec), /// Message with private transaction - PrivateTransaction(Vec), + PrivateTransaction(H256, Vec), /// Message with signed private transaction - SignedPrivateTransaction(Vec), + SignedPrivateTransaction(H256, Vec), } /// Route type to indicate whether it is enacted or retracted. diff --git a/ethcore/src/test_helpers.rs b/ethcore/src/test_helpers.rs index c873db5d1..06cfe7030 100644 --- a/ethcore/src/test_helpers.rs +++ b/ethcore/src/test_helpers.rs @@ -490,8 +490,8 @@ impl ChainNotify for TestNotify { fn broadcast(&self, message: ChainMessageType) { let data = match message { ChainMessageType::Consensus(data) => data, - ChainMessageType::SignedPrivateTransaction(data) => data, - ChainMessageType::PrivateTransaction(data) => data, + ChainMessageType::SignedPrivateTransaction(_, data) => data, + ChainMessageType::PrivateTransaction(_, data) => data, }; self.messages.write().push(data); } diff --git a/ethcore/sync/src/api.rs b/ethcore/sync/src/api.rs index 606aa39b3..5128df3b2 100644 --- a/ethcore/sync/src/api.rs +++ b/ethcore/sync/src/api.rs @@ -38,7 +38,8 @@ use std::net::{SocketAddr, AddrParseError}; use std::str::FromStr; use parking_lot::RwLock; use chain::{ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_62, - PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3}; + PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3, + PRIVATE_TRANSACTION_PACKET, SIGNED_PRIVATE_TRANSACTION_PACKET}; use light::client::AsLightClient; use light::Provider; use light::net::{ @@ -522,8 +523,10 @@ impl ChainNotify for EthSync { let mut sync_io = NetSyncIo::new(context, &*self.eth_handler.chain, &*self.eth_handler.snapshot_service, &self.eth_handler.overlay); match message_type { ChainMessageType::Consensus(message) => self.eth_handler.sync.write().propagate_consensus_packet(&mut sync_io, message), - ChainMessageType::PrivateTransaction(message) => self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, message), - ChainMessageType::SignedPrivateTransaction(message) => self.eth_handler.sync.write().propagate_signed_private_transaction(&mut sync_io, message), + ChainMessageType::PrivateTransaction(transaction_hash, message) => + self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, PRIVATE_TRANSACTION_PACKET, message), + ChainMessageType::SignedPrivateTransaction(transaction_hash, message) => + self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, SIGNED_PRIVATE_TRANSACTION_PACKET, message), } }); } diff --git a/ethcore/sync/src/chain/handler.rs b/ethcore/sync/src/chain/handler.rs index 8547be7b3..c30c60a7c 100644 --- a/ethcore/sync/src/chain/handler.rs +++ b/ethcore/sync/src/chain/handler.rs @@ -552,6 +552,7 @@ impl SyncHandler { asking_hash: None, ask_time: Instant::now(), last_sent_transactions: HashSet::new(), + last_sent_private_transactions: HashSet::new(), expired: false, confirmation: if sync.fork_block.is_none() { ForkConfirmation::Confirmed } else { ForkConfirmation::Unconfirmed }, asking_snapshot_data: None, @@ -631,21 +632,29 @@ impl SyncHandler { } /// Called when peer sends us signed private transaction packet - fn on_signed_private_transaction(sync: &ChainSync, _io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> { + fn on_signed_private_transaction(sync: &mut ChainSync, _io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> { if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) { trace!(target: "sync", "{} Ignoring packet from unconfirmed/unknown peer", peer_id); return Ok(()); } trace!(target: "sync", "Received signed private transaction packet from {:?}", peer_id); - if let Err(e) = sync.private_tx_handler.import_signed_private_transaction(r.as_raw()) { - trace!(target: "sync", "Ignoring the message, error queueing: {}", e); - } + match sync.private_tx_handler.import_signed_private_transaction(r.as_raw()) { + Ok(transaction_hash) => { + //don't send the packet back + if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) { + peer.last_sent_private_transactions.insert(transaction_hash); + } + }, + Err(e) => { + trace!(target: "sync", "Ignoring the message, error queueing: {}", e); + } + } Ok(()) } /// Called when peer sends us new private transaction packet - fn on_private_transaction(sync: &ChainSync, _io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> { + fn on_private_transaction(sync: &mut ChainSync, _io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> { if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) { trace!(target: "sync", "{} Ignoring packet from unconfirmed/unknown peer", peer_id); return Ok(()); @@ -653,9 +662,17 @@ impl SyncHandler { trace!(target: "sync", "Received private transaction packet from {:?}", peer_id); - if let Err(e) = sync.private_tx_handler.import_private_transaction(r.as_raw()) { - trace!(target: "sync", "Ignoring the message, error queueing: {}", e); - } + match sync.private_tx_handler.import_private_transaction(r.as_raw()) { + Ok(transaction_hash) => { + //don't send the packet back + if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) { + peer.last_sent_private_transactions.insert(transaction_hash); + } + }, + Err(e) => { + trace!(target: "sync", "Ignoring the message, error queueing: {}", e); + } + } Ok(()) } } diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index b53c38b43..06bd3feba 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -173,8 +173,8 @@ pub const SNAPSHOT_MANIFEST_PACKET: u8 = 0x12; pub const GET_SNAPSHOT_DATA_PACKET: u8 = 0x13; pub const SNAPSHOT_DATA_PACKET: u8 = 0x14; pub const CONSENSUS_DATA_PACKET: u8 = 0x15; -const PRIVATE_TRANSACTION_PACKET: u8 = 0x16; -const SIGNED_PRIVATE_TRANSACTION_PACKET: u8 = 0x17; +pub const PRIVATE_TRANSACTION_PACKET: u8 = 0x16; +pub const SIGNED_PRIVATE_TRANSACTION_PACKET: u8 = 0x17; const MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD: usize = 3; @@ -324,6 +324,8 @@ pub struct PeerInfo { ask_time: Instant, /// Holds a set of transactions recently sent to this peer to avoid spamming. last_sent_transactions: HashSet, + /// Holds a set of private transactions and their signatures recently sent to this peer to avoid spamming. + last_sent_private_transactions: HashSet, /// Pending request is expired and result should be ignored expired: bool, /// Peer fork confirmation status @@ -353,6 +355,10 @@ impl PeerInfo { self.expired = true; } } + + fn reset_private_stats(&mut self) { + self.last_sent_private_transactions.clear(); + } } #[cfg(not(test))] @@ -1056,8 +1062,15 @@ impl ChainSync { self.peers.iter().filter_map(|(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_2.0 { Some(*id) } else { None }).collect() } - fn get_private_transaction_peers(&self) -> Vec { - self.peers.iter().filter_map(|(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_3.0 { Some(*id) } else { None }).collect() + fn get_private_transaction_peers(&self, transaction_hash: &H256) -> Vec { + self.peers.iter().filter_map( + |(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_3.0 + && !p.last_sent_private_transactions.contains(transaction_hash) { + Some(*id) + } else { + None + } + ).collect() } /// Maintain other peers. Send out any new blocks and transactions @@ -1085,8 +1098,10 @@ impl ChainSync { // Select random peer to re-broadcast transactions to. let peer = random::new().gen_range(0, self.peers.len()); trace!(target: "sync", "Re-broadcasting transactions to a random peer."); - self.peers.values_mut().nth(peer).map(|peer_info| - peer_info.last_sent_transactions.clear() + self.peers.values_mut().nth(peer).map(|peer_info| { + peer_info.last_sent_transactions.clear(); + peer_info.reset_private_stats() + } ); } } @@ -1127,13 +1142,8 @@ impl ChainSync { } /// Broadcast private transaction message to peers. - pub fn propagate_private_transaction(&mut self, io: &mut SyncIo, packet: Bytes) { - SyncPropagator::propagate_private_transaction(self, io, packet); - } - - /// Broadcast signed private transaction message to peers. - pub fn propagate_signed_private_transaction(&mut self, io: &mut SyncIo, packet: Bytes) { - SyncPropagator::propagate_signed_private_transaction(self, io, packet); + pub fn propagate_private_transaction(&mut self, io: &mut SyncIo, transaction_hash: H256, packet_id: PacketId, packet: Bytes) { + SyncPropagator::propagate_private_transaction(self, io, transaction_hash, packet_id, packet); } } @@ -1256,6 +1266,7 @@ pub mod tests { asking_hash: None, ask_time: Instant::now(), last_sent_transactions: HashSet::new(), + last_sent_private_transactions: HashSet::new(), expired: false, confirmation: super::ForkConfirmation::Confirmed, snapshot_number: None, diff --git a/ethcore/sync/src/chain/propagator.rs b/ethcore/sync/src/chain/propagator.rs index aabe90c93..102a31712 100644 --- a/ethcore/sync/src/chain/propagator.rs +++ b/ethcore/sync/src/chain/propagator.rs @@ -36,8 +36,6 @@ use super::{ CONSENSUS_DATA_PACKET, NEW_BLOCK_HASHES_PACKET, NEW_BLOCK_PACKET, - PRIVATE_TRANSACTION_PACKET, - SIGNED_PRIVATE_TRANSACTION_PACKET, TRANSACTIONS_PACKET, }; @@ -293,20 +291,14 @@ impl SyncPropagator { } /// Broadcast private transaction message to peers. - pub fn propagate_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, packet: Bytes) { - let lucky_peers = ChainSync::select_random_peers(&sync.get_private_transaction_peers()); + pub fn propagate_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, transaction_hash: H256, packet_id: PacketId, packet: Bytes) { + let lucky_peers = ChainSync::select_random_peers(&sync.get_private_transaction_peers(&transaction_hash)); trace!(target: "sync", "Sending private transaction packet to {:?}", lucky_peers); for peer_id in lucky_peers { - SyncPropagator::send_packet(io, peer_id, PRIVATE_TRANSACTION_PACKET, packet.clone()); - } - } - - /// Broadcast signed private transaction message to peers. - pub fn propagate_signed_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, packet: Bytes) { - let lucky_peers = ChainSync::select_random_peers(&sync.get_private_transaction_peers()); - trace!(target: "sync", "Sending signed private transaction packet to {:?}", lucky_peers); - for peer_id in lucky_peers { - SyncPropagator::send_packet(io, peer_id, SIGNED_PRIVATE_TRANSACTION_PACKET, packet.clone()); + if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) { + peer.last_sent_private_transactions.insert(transaction_hash); + } + SyncPropagator::send_packet(io, peer_id, packet_id, packet.clone()); } } @@ -428,6 +420,7 @@ mod tests { asking_hash: None, ask_time: Instant::now(), last_sent_transactions: HashSet::new(), + last_sent_private_transactions: HashSet::new(), expired: false, confirmation: ForkConfirmation::Confirmed, snapshot_number: None, diff --git a/ethcore/sync/src/private_tx.rs b/ethcore/sync/src/private_tx.rs index d7434c8bd..03928c22d 100644 --- a/ethcore/sync/src/private_tx.rs +++ b/ethcore/sync/src/private_tx.rs @@ -15,26 +15,29 @@ // along with Parity. If not, see . use parking_lot::Mutex; +use ethereum_types::H256; /// Trait which should be implemented by a private transaction handler. pub trait PrivateTxHandler: Send + Sync + 'static { /// Function called on new private transaction received. - fn import_private_transaction(&self, rlp: &[u8]) -> Result<(), String>; + /// Returns the hash of the imported transaction + fn import_private_transaction(&self, rlp: &[u8]) -> Result; /// Function called on new signed private transaction received. - fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result<(), String>; + /// Returns the hash of the imported transaction + fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result; } /// Nonoperative private transaction handler. pub struct NoopPrivateTxHandler; impl PrivateTxHandler for NoopPrivateTxHandler { - fn import_private_transaction(&self, _rlp: &[u8]) -> Result<(), String> { - Ok(()) + fn import_private_transaction(&self, _rlp: &[u8]) -> Result { + Ok(H256::default()) } - fn import_signed_private_transaction(&self, _rlp: &[u8]) -> Result<(), String> { - Ok(()) + fn import_signed_private_transaction(&self, _rlp: &[u8]) -> Result { + Ok(H256::default()) } } @@ -48,13 +51,13 @@ pub struct SimplePrivateTxHandler { } impl PrivateTxHandler for SimplePrivateTxHandler { - fn import_private_transaction(&self, rlp: &[u8]) -> Result<(), String> { + fn import_private_transaction(&self, rlp: &[u8]) -> Result { self.txs.lock().push(rlp.to_vec()); - Ok(()) + Ok(H256::default()) } - fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result<(), String> { + fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result { self.signed_txs.lock().push(rlp.to_vec()); - Ok(()) + Ok(H256::default()) } } diff --git a/ethcore/sync/src/tests/helpers.rs b/ethcore/sync/src/tests/helpers.rs index 59db57dc5..d75d71ea9 100644 --- a/ethcore/sync/src/tests/helpers.rs +++ b/ethcore/sync/src/tests/helpers.rs @@ -33,7 +33,7 @@ use ethcore::test_helpers; use sync_io::SyncIo; use io::{IoChannel, IoContext, IoHandler}; use api::WARP_SYNC_PROTOCOL_ID; -use chain::{ChainSync, ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_3}; +use chain::{ChainSync, ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_3, PRIVATE_TRANSACTION_PACKET, SIGNED_PRIVATE_TRANSACTION_PACKET}; use SyncConfig; use private_tx::SimplePrivateTxHandler; @@ -230,8 +230,10 @@ impl EthPeer where C: FlushingBlockChainClient { let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None); match message { ChainMessageType::Consensus(data) => self.sync.write().propagate_consensus_packet(&mut io, data), - ChainMessageType::PrivateTransaction(data) => self.sync.write().propagate_private_transaction(&mut io, data), - ChainMessageType::SignedPrivateTransaction(data) => self.sync.write().propagate_signed_private_transaction(&mut io, data), + ChainMessageType::PrivateTransaction(transaction_hash, data) => + self.sync.write().propagate_private_transaction(&mut io, transaction_hash, PRIVATE_TRANSACTION_PACKET, data), + ChainMessageType::SignedPrivateTransaction(transaction_hash, data) => + self.sync.write().propagate_private_transaction(&mut io, transaction_hash, SIGNED_PRIVATE_TRANSACTION_PACKET, data), } } diff --git a/ethcore/sync/src/tests/private.rs b/ethcore/sync/src/tests/private.rs index 04b414b94..9b39aed76 100644 --- a/ethcore/sync/src/tests/private.rs +++ b/ethcore/sync/src/tests/private.rs @@ -24,11 +24,12 @@ use ethcore::CreateContractAddress; use transaction::{Transaction, Action}; use ethcore::executive::{contract_address}; use ethcore::test_helpers::{push_block_with_transactions}; -use ethcore_private_tx::{Provider, ProviderConfig, NoopEncryptor, Importer}; +use ethcore_private_tx::{Provider, ProviderConfig, NoopEncryptor, Importer, SignedPrivateTransaction}; use ethcore::account_provider::AccountProvider; use ethkey::{KeyPair}; use tests::helpers::{TestNet, TestIoHandler}; use rustc_hex::FromHex; +use rlp::Rlp; use SyncConfig; fn seal_spec() -> Spec { @@ -144,6 +145,8 @@ fn send_private_transaction() { //process signed response let signed_private_transaction = received_signed_private_transactions[0].clone(); assert!(pm0.import_signed_private_transaction(&signed_private_transaction).is_ok()); + let signature: SignedPrivateTransaction = Rlp::new(&signed_private_transaction).as_val().unwrap(); + assert!(pm0.process_signature(&signature).is_ok()); let local_transactions = net.peer(0).miner.local_transactions(); assert_eq!(local_transactions.len(), 1); } diff --git a/miner/src/pool/local_transactions.rs b/miner/src/pool/local_transactions.rs index a1c69ef22..a71d9244c 100644 --- a/miner/src/pool/local_transactions.rs +++ b/miner/src/pool/local_transactions.rs @@ -20,7 +20,7 @@ use std::{fmt, sync::Arc}; use ethereum_types::H256; use linked_hash_map::LinkedHashMap; -use pool::VerifiedTransaction as Transaction; +use pool::{VerifiedTransaction as Transaction, ScoredTransaction}; use txpool::{self, VerifiedTransaction}; /// Status of local transaction. diff --git a/miner/src/pool/mod.rs b/miner/src/pool/mod.rs index 4a1223226..ccfbba7f8 100644 --- a/miner/src/pool/mod.rs +++ b/miner/src/pool/mod.rs @@ -24,10 +24,10 @@ use txpool; mod listener; mod queue; mod ready; -mod scoring; pub mod client; pub mod local_transactions; +pub mod scoring; pub mod verifier; #[cfg(test)] @@ -84,7 +84,7 @@ impl PendingSettings { /// Transaction priority. #[derive(Debug, PartialEq, Eq, PartialOrd, Clone, Copy)] -pub(crate) enum Priority { +pub enum Priority { /// Regular transactions received over the network. (no priority boost) Regular, /// Transactions from retracted blocks (medium priority) @@ -108,6 +108,18 @@ impl Priority { } } +/// Scoring properties for verified transaction. +pub trait ScoredTransaction { + /// Gets transaction priority. + fn priority(&self) -> Priority; + + /// Gets transaction gas price. + fn gas_price(&self) -> &U256; + + /// Gets transaction nonce. + fn nonce(&self) -> U256; +} + /// Verified transaction stored in the pool. #[derive(Debug, PartialEq, Eq)] pub struct VerifiedTransaction { @@ -137,11 +149,6 @@ impl VerifiedTransaction { } } - /// Gets transaction priority. - pub(crate) fn priority(&self) -> Priority { - self.priority - } - /// Gets transaction insertion id. pub(crate) fn insertion_id(&self) -> usize { self.insertion_id @@ -175,3 +182,19 @@ impl txpool::VerifiedTransaction for VerifiedTransaction { &self.sender } } + +impl ScoredTransaction for VerifiedTransaction { + fn priority(&self) -> Priority { + self.priority + } + + /// Gets transaction gas price. + fn gas_price(&self) -> &U256 { + &self.transaction.gas_price + } + + /// Gets transaction nonce. + fn nonce(&self) -> U256 { + self.transaction.nonce + } +} diff --git a/miner/src/pool/scoring.rs b/miner/src/pool/scoring.rs index dbe3c08f4..61fcf4e41 100644 --- a/miner/src/pool/scoring.rs +++ b/miner/src/pool/scoring.rs @@ -31,7 +31,7 @@ use std::cmp; use ethereum_types::U256; use txpool::{self, scoring}; -use super::{verifier, PrioritizationStrategy, VerifiedTransaction}; +use super::{verifier, PrioritizationStrategy, VerifiedTransaction, ScoredTransaction}; /// Transaction with the same (sender, nonce) can be replaced only if /// `new_gas_price > old_gas_price + old_gas_price >> SHIFT` @@ -67,23 +67,23 @@ impl NonceAndGasPrice { } } -impl txpool::Scoring for NonceAndGasPrice { +impl

txpool::Scoring

for NonceAndGasPrice where P: ScoredTransaction + txpool::VerifiedTransaction { type Score = U256; type Event = (); - fn compare(&self, old: &VerifiedTransaction, other: &VerifiedTransaction) -> cmp::Ordering { - old.transaction.nonce.cmp(&other.transaction.nonce) + fn compare(&self, old: &P, other: &P) -> cmp::Ordering { + old.nonce().cmp(&other.nonce()) } - fn choose(&self, old: &VerifiedTransaction, new: &VerifiedTransaction) -> scoring::Choice { - if old.transaction.nonce != new.transaction.nonce { + fn choose(&self, old: &P, new: &P) -> scoring::Choice { + if old.nonce() != new.nonce() { return scoring::Choice::InsertNew } - let old_gp = old.transaction.gas_price; - let new_gp = new.transaction.gas_price; + let old_gp = old.gas_price(); + let new_gp = new.gas_price(); - let min_required_gp = bump_gas_price(old_gp); + let min_required_gp = bump_gas_price(*old_gp); match min_required_gp.cmp(&new_gp) { cmp::Ordering::Greater => scoring::Choice::RejectNew, @@ -91,7 +91,7 @@ impl txpool::Scoring for NonceAndGasPrice { } } - fn update_scores(&self, txs: &[txpool::Transaction], scores: &mut [U256], change: scoring::Change) { + fn update_scores(&self, txs: &[txpool::Transaction

], scores: &mut [U256], change: scoring::Change) { use self::scoring::Change; match change { @@ -101,7 +101,7 @@ impl txpool::Scoring for NonceAndGasPrice { assert!(i < txs.len()); assert!(i < scores.len()); - scores[i] = txs[i].transaction.transaction.gas_price; + scores[i] = *txs[i].transaction.gas_price(); let boost = match txs[i].priority() { super::Priority::Local => 15, super::Priority::Retracted => 10, @@ -122,10 +122,10 @@ impl txpool::Scoring for NonceAndGasPrice { } } - fn should_replace(&self, old: &VerifiedTransaction, new: &VerifiedTransaction) -> scoring::Choice { - if old.sender == new.sender { + fn should_replace(&self, old: &P, new: &P) -> scoring::Choice { + if old.sender() == new.sender() { // prefer earliest transaction - match new.transaction.nonce.cmp(&old.transaction.nonce) { + match new.nonce().cmp(&old.nonce()) { cmp::Ordering::Less => scoring::Choice::ReplaceOld, cmp::Ordering::Greater => scoring::Choice::RejectNew, cmp::Ordering::Equal => self.choose(old, new), @@ -134,8 +134,8 @@ impl txpool::Scoring for NonceAndGasPrice { // accept local transactions over the limit scoring::Choice::InsertNew } else { - let old_score = (old.priority(), old.transaction.gas_price); - let new_score = (new.priority(), new.transaction.gas_price); + let old_score = (old.priority(), old.gas_price()); + let new_score = (new.priority(), new.gas_price()); if new_score > old_score { scoring::Choice::ReplaceOld } else { @@ -144,7 +144,7 @@ impl txpool::Scoring for NonceAndGasPrice { } } - fn should_ignore_sender_limit(&self, new: &VerifiedTransaction) -> bool { + fn should_ignore_sender_limit(&self, new: &P) -> bool { new.priority().is_local() } } @@ -185,12 +185,8 @@ mod tests { }; let keypair = Random.generate().unwrap(); - let txs = vec![tx1, tx2, tx3, tx4].into_iter().enumerate().map(|(i, tx)| { - let verified = tx.unsigned().sign(keypair.secret(), None).verified(); - txpool::Transaction { - insertion_id: i as u64, - transaction: Arc::new(verified), - } + let txs = vec![tx1, tx2, tx3, tx4].into_iter().map(|tx| { + tx.unsigned().sign(keypair.secret(), None).verified() }).collect::>(); assert_eq!(scoring.should_replace(&txs[0], &txs[1]), RejectNew); @@ -213,11 +209,7 @@ mod tests { gas_price: 1, ..Default::default() }; - let verified_tx = tx.signed().verified(); - txpool::Transaction { - insertion_id: 0, - transaction: Arc::new(verified_tx), - } + tx.signed().verified() }; let tx_regular_high_gas = { let tx = Tx { @@ -225,11 +217,7 @@ mod tests { gas_price: 10, ..Default::default() }; - let verified_tx = tx.signed().verified(); - txpool::Transaction { - insertion_id: 1, - transaction: Arc::new(verified_tx), - } + tx.signed().verified() }; let tx_local_low_gas = { let tx = Tx { @@ -239,10 +227,7 @@ mod tests { }; let mut verified_tx = tx.signed().verified(); verified_tx.priority = ::pool::Priority::Local; - txpool::Transaction { - insertion_id: 2, - transaction: Arc::new(verified_tx), - } + verified_tx }; let tx_local_high_gas = { let tx = Tx { @@ -252,10 +237,7 @@ mod tests { }; let mut verified_tx = tx.signed().verified(); verified_tx.priority = ::pool::Priority::Local; - txpool::Transaction { - insertion_id: 3, - transaction: Arc::new(verified_tx), - } + verified_tx }; assert_eq!(scoring.should_replace(&tx_regular_low_gas, &tx_regular_high_gas), ReplaceOld);