diff --git a/Cargo.lock b/Cargo.lock
index cf2abdd3e..88cebe3a8 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -741,6 +741,7 @@ dependencies = [
"ethkey 0.3.0",
"fetch 0.1.0",
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
+ "heapsize 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"keccak-hash 0.1.2 (git+https://github.com/paritytech/parity-common)",
"log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
"parity-bytes 0.1.0 (git+https://github.com/paritytech/parity-common)",
@@ -756,6 +757,7 @@ dependencies = [
"serde_derive 1.0.37 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)",
"tiny-keccak 1.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
+ "transaction-pool 1.13.1",
"url 1.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
@@ -808,6 +810,7 @@ dependencies = [
"ethcore-io 1.12.0",
"ethcore-private-tx 1.0.0",
"ethcore-sync 1.12.0",
+ "ethereum-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"kvdb 0.1.0 (git+https://github.com/paritytech/parity-common)",
"kvdb-rocksdb 0.1.0 (git+https://github.com/paritytech/parity-common)",
"log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
diff --git a/ethcore/private-tx/Cargo.toml b/ethcore/private-tx/Cargo.toml
index 2383443e7..cebf388f1 100644
--- a/ethcore/private-tx/Cargo.toml
+++ b/ethcore/private-tx/Cargo.toml
@@ -22,6 +22,7 @@ ethjson = { path = "../../json" }
ethkey = { path = "../../ethkey" }
fetch = { path = "../../util/fetch" }
futures = "0.1"
+heapsize = "0.4"
keccak-hash = { git = "https://github.com/paritytech/parity-common" }
log = "0.4"
parking_lot = "0.6"
@@ -35,6 +36,7 @@ serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
tiny-keccak = "1.4"
+transaction-pool = { path = "../../transaction-pool" }
url = "1"
[dev-dependencies]
diff --git a/ethcore/private-tx/src/encryptor.rs b/ethcore/private-tx/src/encryptor.rs
index e64917add..c1d3d3fb8 100644
--- a/ethcore/private-tx/src/encryptor.rs
+++ b/ethcore/private-tx/src/encryptor.rs
@@ -208,7 +208,7 @@ impl Encryptor for SecretStoreEncryptor {
let key = match self.retrieve_key("", false, contract_address, &*accounts) {
Ok(key) => Ok(key),
Err(Error(ErrorKind::EncryptionKeyNotFound(_), _)) => {
- trace!("Key for account wasnt found in sstore. Creating. Address: {:?}", contract_address);
+ trace!(target: "privatetx", "Key for account wasnt found in sstore. Creating. Address: {:?}", contract_address);
self.retrieve_key(&format!("/{}", self.config.threshold), true, contract_address, &*accounts)
}
Err(err) => Err(err),
diff --git a/ethcore/private-tx/src/error.rs b/ethcore/private-tx/src/error.rs
index 55a75d6d9..99da149e4 100644
--- a/ethcore/private-tx/src/error.rs
+++ b/ethcore/private-tx/src/error.rs
@@ -21,12 +21,14 @@ use ethcore::account_provider::SignError;
use ethcore::error::{Error as EthcoreError, ExecutionError};
use transaction::Error as TransactionError;
use ethkey::Error as KeyError;
+use txpool::Error as TxPoolError;
error_chain! {
foreign_links {
Io(::std::io::Error) #[doc = "Error concerning the Rust standard library's IO subsystem."];
Decoder(DecoderError) #[doc = "RLP decoding error."];
Trie(TrieError) #[doc = "Error concerning TrieDBs."];
+ Txpool(TxPoolError) #[doc = "Tx pool error."];
}
errors {
diff --git a/ethcore/private-tx/src/lib.rs b/ethcore/private-tx/src/lib.rs
index a661197da..24727fe0f 100644
--- a/ethcore/private-tx/src/lib.rs
+++ b/ethcore/private-tx/src/lib.rs
@@ -37,9 +37,11 @@ extern crate ethkey;
extern crate ethjson;
extern crate fetch;
extern crate futures;
+extern crate heapsize;
extern crate keccak_hash as hash;
extern crate parking_lot;
extern crate patricia_trie as trie;
+extern crate transaction_pool as txpool;
extern crate patricia_trie_ethereum as ethtrie;
extern crate rlp;
extern crate url;
@@ -61,7 +63,7 @@ extern crate rand;
extern crate ethcore_logger;
pub use encryptor::{Encryptor, SecretStoreEncryptor, EncryptorConfig, NoopEncryptor};
-pub use private_transactions::{PrivateTransactionDesc, VerificationStore, PrivateTransactionSigningDesc, SigningStore};
+pub use private_transactions::{VerifiedPrivateTransaction, VerificationStore, PrivateTransactionSigningDesc, SigningStore};
pub use messages::{PrivateTransaction, SignedPrivateTransaction};
pub use error::{Error, ErrorKind};
@@ -71,7 +73,7 @@ use std::time::Duration;
use ethereum_types::{H128, H256, U256, Address};
use hash::keccak;
use rlp::*;
-use parking_lot::{Mutex, RwLock};
+use parking_lot::RwLock;
use bytes::Bytes;
use ethkey::{Signature, recover, public_to_address};
use io::IoChannel;
@@ -128,9 +130,8 @@ pub struct Provider {
signer_account: Option
,
passwords: Vec,
notify: RwLock>>,
- transactions_for_signing: Mutex,
- // TODO [ToDr] Move the Mutex/RwLock inside `VerificationStore` after refactored to `drain`.
- transactions_for_verification: Mutex,
+ transactions_for_signing: RwLock,
+ transactions_for_verification: VerificationStore,
client: Arc,
miner: Arc,
accounts: Arc,
@@ -161,8 +162,8 @@ impl Provider where {
signer_account: config.signer_account,
passwords: config.passwords,
notify: RwLock::default(),
- transactions_for_signing: Mutex::default(),
- transactions_for_verification: Mutex::default(),
+ transactions_for_signing: RwLock::default(),
+ transactions_for_verification: VerificationStore::default(),
client,
miner,
accounts,
@@ -190,9 +191,9 @@ impl Provider where {
/// 3. Save it with state returned on prev step to the queue for signing
/// 4. Broadcast corresponding message to the chain
pub fn create_private_transaction(&self, signed_transaction: SignedTransaction) -> Result {
- trace!("Creating private transaction from regular transaction: {:?}", signed_transaction);
+ trace!(target: "privatetx", "Creating private transaction from regular transaction: {:?}", signed_transaction);
if self.signer_account.is_none() {
- trace!("Signing account not set");
+ warn!(target: "privatetx", "Signing account not set");
bail!(ErrorKind::SignerAccountNotSet);
}
let tx_hash = signed_transaction.hash();
@@ -203,10 +204,7 @@ impl Provider where {
Action::Call(contract) => {
let data = signed_transaction.rlp_bytes();
let encrypted_transaction = self.encrypt(&contract, &Self::iv_from_transaction(&signed_transaction), &data)?;
- let private = PrivateTransaction {
- encrypted: encrypted_transaction,
- contract,
- };
+ let private = PrivateTransaction::new(encrypted_transaction, contract);
// TODO [ToDr] Using BlockId::Latest is bad here,
// the block may change in the middle of execution
// causing really weird stuff to happen.
@@ -215,16 +213,16 @@ impl Provider where {
// in private-tx to avoid such mistakes.
let contract_nonce = self.get_contract_nonce(&contract, BlockId::Latest)?;
let private_state = self.execute_private_transaction(BlockId::Latest, &signed_transaction)?;
- trace!("Private transaction created, encrypted transaction: {:?}, private state: {:?}", private, private_state);
+ trace!(target: "privatetx", "Private transaction created, encrypted transaction: {:?}, private state: {:?}", private, private_state);
let contract_validators = self.get_validators(BlockId::Latest, &contract)?;
- trace!("Required validators: {:?}", contract_validators);
+ trace!(target: "privatetx", "Required validators: {:?}", contract_validators);
let private_state_hash = self.calculate_state_hash(&private_state, contract_nonce);
- trace!("Hashed effective private state for sender: {:?}", private_state_hash);
- self.transactions_for_signing.lock().add_transaction(private.hash(), signed_transaction, contract_validators, private_state, contract_nonce)?;
- self.broadcast_private_transaction(private.rlp_bytes().into_vec());
+ trace!(target: "privatetx", "Hashed effective private state for sender: {:?}", private_state_hash);
+ self.transactions_for_signing.write().add_transaction(private.hash(), signed_transaction, contract_validators, private_state, contract_nonce)?;
+ self.broadcast_private_transaction(private.hash(), private.rlp_bytes().into_vec());
Ok(Receipt {
hash: tx_hash,
- contract_address: None,
+ contract_address: Some(contract),
status_code: 0,
})
}
@@ -240,14 +238,6 @@ impl Provider where {
keccak(&state_buf.as_ref())
}
- /// Extract signed transaction from private transaction
- fn extract_original_transaction(&self, private: PrivateTransaction, contract: &Address) -> Result {
- let encrypted_transaction = private.encrypted;
- let transaction_bytes = self.decrypt(contract, &encrypted_transaction)?;
- let original_transaction: UnverifiedTransaction = Rlp::new(&transaction_bytes).as_val()?;
- Ok(original_transaction)
- }
-
fn pool_client<'a>(&'a self, nonce_cache: &'a NonceCache) -> miner::pool_client::PoolClient<'a, Client> {
let engine = self.client.engine();
let refuse_service_transactions = true;
@@ -261,48 +251,122 @@ impl Provider where {
}
/// Retrieve and verify the first available private transaction for every sender
- ///
- /// TODO [ToDr] It seems that:
- /// The 3 methods `ready_transaction,get_descriptor,remove` are always used in conjuction so most likely
- /// can be replaced with a single `drain()` method instead.
- /// Thanks to this we also don't really need to lock the entire verification for the time of execution.
- fn process_queue(&self) -> Result<(), Error> {
+ fn process_verification_queue(&self) -> Result<(), Error> {
let nonce_cache = NonceCache::new(NONCE_CACHE_SIZE);
- let mut verification_queue = self.transactions_for_verification.lock();
- let ready_transactions = verification_queue.ready_transactions(self.pool_client(&nonce_cache));
- for transaction in ready_transactions {
- let transaction_hash = transaction.signed().hash();
- match verification_queue.private_transaction_descriptor(&transaction_hash) {
- Ok(desc) => {
- if !self.validator_accounts.contains(&desc.validator_account) {
- trace!("Cannot find validator account in config");
- bail!(ErrorKind::ValidatorAccountNotSet);
+ let process_transaction = |transaction: &VerifiedPrivateTransaction| -> Result<_, String> {
+ let private_hash = transaction.private_transaction.hash();
+ match transaction.validator_account {
+ None => {
+ trace!(target: "privatetx", "Propagating transaction further");
+ self.broadcast_private_transaction(private_hash, transaction.private_transaction.rlp_bytes().into_vec());
+ return Ok(());
+ }
+ Some(validator_account) => {
+ if !self.validator_accounts.contains(&validator_account) {
+ trace!(target: "privatetx", "Propagating transaction further");
+ self.broadcast_private_transaction(private_hash, transaction.private_transaction.rlp_bytes().into_vec());
+ return Ok(());
}
- let account = desc.validator_account;
- if let Action::Call(contract) = transaction.signed().action {
+ let tx_action = transaction.transaction.action.clone();
+ if let Action::Call(contract) = tx_action {
// TODO [ToDr] Usage of BlockId::Latest
- let contract_nonce = self.get_contract_nonce(&contract, BlockId::Latest)?;
- let private_state = self.execute_private_transaction(BlockId::Latest, transaction.signed())?;
+ let contract_nonce = self.get_contract_nonce(&contract, BlockId::Latest);
+ if let Err(e) = contract_nonce {
+ bail!("Cannot retrieve contract nonce: {:?}", e);
+ }
+ let contract_nonce = contract_nonce.expect("Error was checked before");
+ let private_state = self.execute_private_transaction(BlockId::Latest, &transaction.transaction);
+ if let Err(e) = private_state {
+ bail!("Cannot retrieve private state: {:?}", e);
+ }
+ let private_state = private_state.expect("Error was checked before");
let private_state_hash = self.calculate_state_hash(&private_state, contract_nonce);
- trace!("Hashed effective private state for validator: {:?}", private_state_hash);
- let password = find_account_password(&self.passwords, &*self.accounts, &account);
- let signed_state = self.accounts.sign(account, password, private_state_hash)?;
- let signed_private_transaction = SignedPrivateTransaction::new(desc.private_hash, signed_state, None);
- trace!("Sending signature for private transaction: {:?}", signed_private_transaction);
- self.broadcast_signed_private_transaction(signed_private_transaction.rlp_bytes().into_vec());
+ trace!(target: "privatetx", "Hashed effective private state for validator: {:?}", private_state_hash);
+ let password = find_account_password(&self.passwords, &*self.accounts, &validator_account);
+ let signed_state = self.accounts.sign(validator_account, password, private_state_hash);
+ if let Err(e) = signed_state {
+ bail!("Cannot sign the state: {:?}", e);
+ }
+ let signed_state = signed_state.expect("Error was checked before");
+ let signed_private_transaction = SignedPrivateTransaction::new(private_hash, signed_state, None);
+ trace!(target: "privatetx", "Sending signature for private transaction: {:?}", signed_private_transaction);
+ self.broadcast_signed_private_transaction(signed_private_transaction.hash(), signed_private_transaction.rlp_bytes().into_vec());
} else {
- warn!("Incorrect type of action for the transaction");
+ bail!("Incorrect type of action for the transaction");
}
- },
- Err(e) => {
- warn!("Cannot retrieve descriptor for transaction with error {:?}", e);
}
}
- verification_queue.remove_private_transaction(&transaction_hash);
+ Ok(())
+ };
+ let ready_transactions = self.transactions_for_verification.drain(self.pool_client(&nonce_cache));
+ for transaction in ready_transactions {
+ if let Err(e) = process_transaction(&transaction) {
+ warn!(target: "privatetx", "Error: {:?}", e);
+ }
}
Ok(())
}
+ /// Add signed private transaction into the store
+ /// Creates corresponding public transaction if last required signature collected and sends it to the chain
+ pub fn process_signature(&self, signed_tx: &SignedPrivateTransaction) -> Result<(), Error> {
+ trace!(target: "privatetx", "Processing signed private transaction");
+ let private_hash = signed_tx.private_transaction_hash();
+ let desc = match self.transactions_for_signing.read().get(&private_hash) {
+ None => {
+ // Not our transaction, broadcast further to peers
+ self.broadcast_signed_private_transaction(signed_tx.hash(), signed_tx.rlp_bytes().into_vec());
+ return Ok(());
+ },
+ Some(desc) => desc,
+ };
+ let last = self.last_required_signature(&desc, signed_tx.signature())?;
+
+ if last {
+ let mut signatures = desc.received_signatures.clone();
+ signatures.push(signed_tx.signature());
+ let rsv: Vec = signatures.into_iter().map(|sign| sign.into_electrum().into()).collect();
+ //Create public transaction
+ let public_tx = self.public_transaction(
+ desc.state.clone(),
+ &desc.original_transaction,
+ &rsv,
+ desc.original_transaction.nonce,
+ desc.original_transaction.gas_price
+ )?;
+ trace!(target: "privatetx", "Last required signature received, public transaction created: {:?}", public_tx);
+ //Sign and add it to the queue
+ let chain_id = desc.original_transaction.chain_id();
+ let hash = public_tx.hash(chain_id);
+ let signer_account = self.signer_account.ok_or_else(|| ErrorKind::SignerAccountNotSet)?;
+ let password = find_account_password(&self.passwords, &*self.accounts, &signer_account);
+ let signature = self.accounts.sign(signer_account, password, hash)?;
+ let signed = SignedTransaction::new(public_tx.with_signature(signature, chain_id))?;
+ match self.miner.import_own_transaction(&*self.client, signed.into()) {
+ Ok(_) => trace!(target: "privatetx", "Public transaction added to queue"),
+ Err(err) => {
+ warn!(target: "privatetx", "Failed to add transaction to queue, error: {:?}", err);
+ bail!(err);
+ }
+ }
+ //Remove from store for signing
+ if let Err(err) = self.transactions_for_signing.write().remove(&private_hash) {
+ warn!(target: "privatetx", "Failed to remove transaction from signing store, error: {:?}", err);
+ bail!(err);
+ }
+ } else {
+ //Add signature to the store
+ match self.transactions_for_signing.write().add_signature(&private_hash, signed_tx.signature()) {
+ Ok(_) => trace!(target: "privatetx", "Signature stored for private transaction"),
+ Err(err) => {
+ warn!(target: "privatetx", "Failed to add signature to signing store, error: {:?}", err);
+ bail!(err);
+ }
+ }
+ }
+ Ok(())
+ }
+
fn last_required_signature(&self, desc: &PrivateTransactionSigningDesc, sign: Signature) -> Result {
if desc.received_signatures.contains(&sign) {
return Ok(false);
@@ -316,26 +380,26 @@ impl Provider where {
Ok(desc.received_signatures.len() + 1 == desc.validators.len())
}
false => {
- trace!("Sender's state doesn't correspond to validator's");
+ warn!(target: "privatetx", "Sender's state doesn't correspond to validator's");
bail!(ErrorKind::StateIncorrect);
}
}
}
Err(err) => {
- trace!("Sender's state doesn't correspond to validator's, error {:?}", err);
+ warn!(target: "privatetx", "Sender's state doesn't correspond to validator's, error {:?}", err);
bail!(err);
}
}
}
/// Broadcast the private transaction message to the chain
- fn broadcast_private_transaction(&self, message: Bytes) {
- self.notify(|notify| notify.broadcast(ChainMessageType::PrivateTransaction(message.clone())));
+ fn broadcast_private_transaction(&self, transaction_hash: H256, message: Bytes) {
+ self.notify(|notify| notify.broadcast(ChainMessageType::PrivateTransaction(transaction_hash, message.clone())));
}
/// Broadcast signed private transaction message to the chain
- fn broadcast_signed_private_transaction(&self, message: Bytes) {
- self.notify(|notify| notify.broadcast(ChainMessageType::SignedPrivateTransaction(message.clone())));
+ fn broadcast_signed_private_transaction(&self, transaction_hash: H256, message: Bytes) {
+ self.notify(|notify| notify.broadcast(ChainMessageType::SignedPrivateTransaction(transaction_hash, message.clone())));
}
fn iv_from_transaction(transaction: &SignedTransaction) -> H128 {
@@ -351,12 +415,12 @@ impl Provider where {
}
fn encrypt(&self, contract_address: &Address, initialisation_vector: &H128, data: &[u8]) -> Result {
- trace!("Encrypt data using key(address): {:?}", contract_address);
+ trace!(target: "privatetx", "Encrypt data using key(address): {:?}", contract_address);
Ok(self.encryptor.encrypt(contract_address, &*self.accounts, initialisation_vector, data)?)
}
fn decrypt(&self, contract_address: &Address, data: &[u8]) -> Result {
- trace!("Decrypt data using key(address): {:?}", contract_address);
+ trace!(target: "privatetx", "Decrypt data using key(address): {:?}", contract_address);
Ok(self.encryptor.decrypt(contract_address, &*self.accounts, data)?)
}
@@ -421,7 +485,7 @@ impl Provider where {
Action::Call(ref contract_address) => {
let contract_code = Arc::new(self.get_decrypted_code(contract_address, block)?);
let contract_state = self.get_decrypted_state(contract_address, block)?;
- trace!("Patching contract at {:?}, code: {:?}, state: {:?}", contract_address, contract_code, contract_state);
+ trace!(target: "privatetx", "Patching contract at {:?}, code: {:?}, state: {:?}", contract_address, contract_code, contract_state);
state.patch_account(contract_address, contract_code, Self::snapshot_to_storage(contract_state))?;
Some(*contract_address)
},
@@ -449,7 +513,7 @@ impl Provider where {
(enc_code, self.encrypt(&address, &Self::iv_from_transaction(transaction), &Self::snapshot_from_storage(&storage))?)
},
};
- trace!("Private contract executed. code: {:?}, state: {:?}, result: {:?}", encrypted_code, encrypted_storage, result.output);
+ trace!(target: "privatetx", "Private contract executed. code: {:?}, state: {:?}, result: {:?}", encrypted_code, encrypted_storage, result.output);
Ok(PrivateExecutionResult {
code: encrypted_code,
state: encrypted_storage,
@@ -550,12 +614,12 @@ impl Provider where {
pub trait Importer {
/// Process received private transaction
- fn import_private_transaction(&self, _rlp: &[u8]) -> Result<(), Error>;
+ fn import_private_transaction(&self, _rlp: &[u8]) -> Result;
/// Add signed private transaction into the store
///
/// Creates corresponding public transaction if last required signature collected and sends it to the chain
- fn import_signed_private_transaction(&self, _rlp: &[u8]) -> Result<(), Error>;
+ fn import_signed_private_transaction(&self, _rlp: &[u8]) -> Result;
}
// TODO [ToDr] Offload more heavy stuff to the IoService thread.
@@ -564,115 +628,59 @@ pub trait Importer {
// for both verification and execution.
impl Importer for Arc {
- fn import_private_transaction(&self, rlp: &[u8]) -> Result<(), Error> {
- trace!("Private transaction received");
+ fn import_private_transaction(&self, rlp: &[u8]) -> Result {
+ trace!(target: "privatetx", "Private transaction received");
let private_tx: PrivateTransaction = Rlp::new(rlp).as_val()?;
- let contract = private_tx.contract;
+ let private_tx_hash = private_tx.hash();
+ let contract = private_tx.contract();
let contract_validators = self.get_validators(BlockId::Latest, &contract)?;
let validation_account = contract_validators
.iter()
.find(|address| self.validator_accounts.contains(address));
- match validation_account {
- None => {
- // TODO [ToDr] This still seems a bit invalid, imho we should still import the transaction to the pool.
- // Importing to pool verifies correctness and nonce; here we are just blindly forwarding.
- //
- // Not for verification, broadcast further to peers
- self.broadcast_private_transaction(rlp.into());
- return Ok(());
- },
- Some(&validation_account) => {
- let hash = private_tx.hash();
- trace!("Private transaction taken for verification");
- let original_tx = self.extract_original_transaction(private_tx, &contract)?;
- trace!("Validating transaction: {:?}", original_tx);
- // Verify with the first account available
- trace!("The following account will be used for verification: {:?}", validation_account);
- let nonce_cache = NonceCache::new(NONCE_CACHE_SIZE);
- self.transactions_for_verification.lock().add_transaction(
- original_tx,
- contract,
- validation_account,
- hash,
- self.pool_client(&nonce_cache),
- )?;
- let provider = Arc::downgrade(self);
- self.channel.send(ClientIoMessage::execute(move |_| {
- if let Some(provider) = provider.upgrade() {
- if let Err(e) = provider.process_queue() {
- debug!("Unable to process the queue: {}", e);
- }
- }
- })).map_err(|_| ErrorKind::ClientIsMalformed.into())
+ //extract the original transaction
+ let encrypted_data = private_tx.encrypted();
+ let transaction_bytes = self.decrypt(&contract, &encrypted_data)?;
+ let original_tx: UnverifiedTransaction = Rlp::new(&transaction_bytes).as_val()?;
+ let nonce_cache = NonceCache::new(NONCE_CACHE_SIZE);
+ //add to the queue for further verification
+ self.transactions_for_verification.add_transaction(
+ original_tx,
+ validation_account.map(|&account| account),
+ private_tx,
+ self.pool_client(&nonce_cache),
+ )?;
+ let provider = Arc::downgrade(self);
+ let result = self.channel.send(ClientIoMessage::execute(move |_| {
+ if let Some(provider) = provider.upgrade() {
+ if let Err(e) = provider.process_verification_queue() {
+ warn!(target: "privatetx", "Unable to process the queue: {}", e);
+ }
}
+ }));
+ if let Err(e) = result {
+ warn!(target: "privatetx", "Error sending NewPrivateTransaction message: {:?}", e);
}
+ Ok(private_tx_hash)
}
- fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result<(), Error> {
+ fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result {
let tx: SignedPrivateTransaction = Rlp::new(rlp).as_val()?;
- trace!("Signature for private transaction received: {:?}", tx);
+ trace!(target: "privatetx", "Signature for private transaction received: {:?}", tx);
let private_hash = tx.private_transaction_hash();
- let desc = match self.transactions_for_signing.lock().get(&private_hash) {
- None => {
- // TODO [ToDr] Verification (we can't just blindly forward every transaction)
-
- // Not our transaction, broadcast further to peers
- self.broadcast_signed_private_transaction(rlp.into());
- return Ok(());
- },
- Some(desc) => desc,
- };
-
- let last = self.last_required_signature(&desc, tx.signature())?;
-
- if last {
- let mut signatures = desc.received_signatures.clone();
- signatures.push(tx.signature());
- let rsv: Vec = signatures.into_iter().map(|sign| sign.into_electrum().into()).collect();
- //Create public transaction
- let public_tx = self.public_transaction(
- desc.state.clone(),
- &desc.original_transaction,
- &rsv,
- desc.original_transaction.nonce,
- desc.original_transaction.gas_price
- )?;
- trace!("Last required signature received, public transaction created: {:?}", public_tx);
- //Sign and add it to the queue
- let chain_id = desc.original_transaction.chain_id();
- let hash = public_tx.hash(chain_id);
- let signer_account = self.signer_account.ok_or_else(|| ErrorKind::SignerAccountNotSet)?;
- let password = find_account_password(&self.passwords, &*self.accounts, &signer_account);
- let signature = self.accounts.sign(signer_account, password, hash)?;
- let signed = SignedTransaction::new(public_tx.with_signature(signature, chain_id))?;
- match self.miner.import_own_transaction(&*self.client, signed.into()) {
- Ok(_) => trace!("Public transaction added to queue"),
- Err(err) => {
- trace!("Failed to add transaction to queue, error: {:?}", err);
- bail!(err);
- }
- }
- //Remove from store for signing
- match self.transactions_for_signing.lock().remove(&private_hash) {
- Ok(_) => {}
- Err(err) => {
- trace!("Failed to remove transaction from signing store, error: {:?}", err);
- bail!(err);
- }
- }
- } else {
- //Add signature to the store
- match self.transactions_for_signing.lock().add_signature(&private_hash, tx.signature()) {
- Ok(_) => trace!("Signature stored for private transaction"),
- Err(err) => {
- trace!("Failed to add signature to signing store, error: {:?}", err);
- bail!(err);
+ let provider = Arc::downgrade(self);
+ let result = self.channel.send(ClientIoMessage::execute(move |_| {
+ if let Some(provider) = provider.upgrade() {
+ if let Err(e) = provider.process_signature(&tx) {
+ warn!(target: "privatetx", "Unable to process the signature: {}", e);
}
}
+ }));
+ if let Err(e) = result {
+ warn!(target: "privatetx", "Error sending NewSignedPrivateTransaction message: {:?}", e);
}
- Ok(())
+ Ok(private_hash)
}
}
@@ -689,9 +697,9 @@ fn find_account_password(passwords: &Vec, account_provider: &AccountPr
impl ChainNotify for Provider {
fn new_blocks(&self, imported: Vec, _invalid: Vec, _route: ChainRoute, _sealed: Vec, _proposed: Vec, _duration: Duration) {
if !imported.is_empty() {
- trace!("New blocks imported, try to prune the queue");
- if let Err(err) = self.process_queue() {
- trace!("Cannot prune private transactions queue. error: {:?}", err);
+ trace!(target: "privatetx", "New blocks imported, try to prune the queue");
+ if let Err(err) = self.process_verification_queue() {
+ warn!(target: "privatetx", "Cannot prune private transactions queue. error: {:?}", err);
}
}
}
diff --git a/ethcore/private-tx/src/messages.rs b/ethcore/private-tx/src/messages.rs
index 57362e7ce..c0825fb59 100644
--- a/ethcore/private-tx/src/messages.rs
+++ b/ethcore/private-tx/src/messages.rs
@@ -25,15 +25,41 @@ use transaction::signature::{add_chain_replay_protection, check_replay_protectio
#[derive(Default, Debug, Clone, PartialEq, RlpEncodable, RlpDecodable, Eq)]
pub struct PrivateTransaction {
/// Encrypted data
- pub encrypted: Bytes,
+ encrypted: Bytes,
/// Address of the contract
- pub contract: Address,
+ contract: Address,
+ /// Hash
+ hash: H256,
}
impl PrivateTransaction {
- /// Compute hash on private transaction
+ /// Constructor
+ pub fn new(encrypted: Bytes, contract: Address) -> Self {
+ PrivateTransaction {
+ encrypted,
+ contract,
+ hash: 0.into(),
+ }.compute_hash()
+ }
+
+ fn compute_hash(mut self) -> PrivateTransaction {
+ self.hash = keccak(&*self.rlp_bytes());
+ self
+ }
+
+ /// Hash of the private transaction
pub fn hash(&self) -> H256 {
- keccak(&*self.rlp_bytes())
+ self.hash
+ }
+
+ /// Address of the contract
+ pub fn contract(&self) -> Address {
+ self.contract
+ }
+
+ /// Encrypted data
+ pub fn encrypted(&self) -> Bytes {
+ self.encrypted.clone()
}
}
@@ -49,6 +75,8 @@ pub struct SignedPrivateTransaction {
r: U256,
/// The S field of the signature
s: U256,
+ /// Hash
+ hash: H256,
}
impl SignedPrivateTransaction {
@@ -59,7 +87,13 @@ impl SignedPrivateTransaction {
r: sig.r().into(),
s: sig.s().into(),
v: add_chain_replay_protection(sig.v() as u64, chain_id),
- }
+ hash: 0.into(),
+ }.compute_hash()
+ }
+
+ fn compute_hash(mut self) -> SignedPrivateTransaction {
+ self.hash = keccak(&*self.rlp_bytes());
+ self
}
pub fn standard_v(&self) -> u8 { check_replay_protection(self.v) }
@@ -73,4 +107,9 @@ impl SignedPrivateTransaction {
pub fn private_transaction_hash(&self) -> H256 {
self.private_transaction_hash
}
+
+ /// Own hash
+ pub fn hash(&self) -> H256 {
+ self.hash
+ }
}
diff --git a/ethcore/private-tx/src/private_transactions.rs b/ethcore/private-tx/src/private_transactions.rs
index e16d6ab91..a0f58f9ca 100644
--- a/ethcore/private-tx/src/private_transactions.rs
+++ b/ethcore/private-tx/src/private_transactions.rs
@@ -15,62 +15,139 @@
// along with Parity. If not, see .
use std::sync::Arc;
+use std::cmp;
use std::collections::{HashMap, HashSet};
use bytes::Bytes;
use ethcore_miner::pool;
use ethereum_types::{H256, U256, Address};
+use heapsize::HeapSizeOf;
use ethkey::Signature;
+use messages::PrivateTransaction;
+use parking_lot::RwLock;
use transaction::{UnverifiedTransaction, SignedTransaction};
-
+use txpool;
+use txpool::{VerifiedTransaction, Verifier};
use error::{Error, ErrorKind};
+type Pool = txpool::Pool;
+
/// Maximum length for private transactions queues.
const MAX_QUEUE_LEN: usize = 8312;
-/// Desriptor for private transaction stored in queue for verification
-#[derive(Default, Debug, Clone, PartialEq, Eq)]
-pub struct PrivateTransactionDesc {
- /// Hash of the private transaction
- pub private_hash: H256,
- /// Contract's address used in private transaction
- pub contract: Address,
+/// Private transaction stored in queue for verification
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct VerifiedPrivateTransaction {
+ /// Original private transaction
+ pub private_transaction: PrivateTransaction,
/// Address that should be used for verification
- pub validator_account: Address,
+ pub validator_account: Option,
+ /// Resulting verified transaction
+ pub transaction: SignedTransaction,
+ /// Original transaction hash
+ pub transaction_hash: H256,
+ /// Original transaction sender
+ pub transaction_sender: Address,
+}
+
+impl txpool::VerifiedTransaction for VerifiedPrivateTransaction {
+ type Hash = H256;
+ type Sender = Address;
+
+ fn hash(&self) -> &H256 {
+ &self.transaction_hash
+ }
+
+ fn mem_usage(&self) -> usize {
+ self.transaction.heap_size_of_children()
+ }
+
+ fn sender(&self) -> &Address {
+ &self.transaction_sender
+ }
+}
+
+impl pool::ScoredTransaction for VerifiedPrivateTransaction {
+ fn priority(&self) -> pool::Priority {
+ pool::Priority::Regular
+ }
+
+ /// Gets transaction gas price.
+ fn gas_price(&self) -> &U256 {
+ &self.transaction.gas_price
+ }
+
+ /// Gets transaction nonce.
+ fn nonce(&self) -> U256 {
+ self.transaction.nonce
+ }
+}
+
+/// Checks readiness of transactions by looking if the transaction from sender already exists.
+/// Guarantees only one transaction per sender
+#[derive(Debug)]
+pub struct PrivateReadyState {
+ senders: HashSet,
+ state: C,
+}
+
+impl PrivateReadyState {
+ /// Create new State checker, given client interface.
+ pub fn new(
+ state: C,
+ ) -> Self {
+ PrivateReadyState {
+ senders: Default::default(),
+ state,
+ }
+ }
+}
+
+impl txpool::Ready for PrivateReadyState {
+ fn is_ready(&mut self, tx: &VerifiedPrivateTransaction) -> txpool::Readiness {
+ let sender = tx.sender();
+ let state = &self.state;
+ let state_nonce = state.account_nonce(sender);
+ if self.senders.contains(sender) {
+ txpool::Readiness::Future
+ } else {
+ self.senders.insert(*sender);
+ match tx.transaction.nonce.cmp(&state_nonce) {
+ cmp::Ordering::Greater => txpool::Readiness::Future,
+ cmp::Ordering::Less => txpool::Readiness::Stale,
+ cmp::Ordering::Equal => txpool::Readiness::Ready,
+ }
+ }
+ }
}
/// Storage for private transactions for verification
pub struct VerificationStore {
- /// Descriptors for private transactions in queue for verification with key - hash of the original transaction
- descriptors: HashMap,
- /// Queue with transactions for verification
- ///
- /// TODO [ToDr] Might actually be better to use `txpool` directly and:
- /// 1. Store descriptors inside `VerifiedTransaction`
- /// 2. Use custom `ready` implementation to only fetch one transaction per sender.
- /// 3. Get rid of passing dummy `block_number` and `timestamp`
- transactions: pool::TransactionQueue,
+ verification_pool: RwLock,
+ verification_options: pool::verifier::Options,
}
impl Default for VerificationStore {
fn default() -> Self {
VerificationStore {
- descriptors: Default::default(),
- transactions: pool::TransactionQueue::new(
- pool::Options {
- max_count: MAX_QUEUE_LEN,
- max_per_sender: MAX_QUEUE_LEN / 10,
- max_mem_usage: 8 * 1024 * 1024,
- },
- pool::verifier::Options {
- // TODO [ToDr] This should probably be based on some real values?
- minimal_gas_price: 0.into(),
- block_gas_limit: 8_000_000.into(),
- tx_gas_limit: U256::max_value(),
- no_early_reject: false
- },
- pool::PrioritizationStrategy::GasPriceOnly,
- )
+ verification_pool: RwLock::new(
+ txpool::Pool::new(
+ txpool::NoopListener,
+ pool::scoring::NonceAndGasPrice(pool::PrioritizationStrategy::GasPriceOnly),
+ pool::Options {
+ max_count: MAX_QUEUE_LEN,
+ max_per_sender: MAX_QUEUE_LEN / 10,
+ max_mem_usage: 8 * 1024 * 1024,
+ },
+ )
+ ),
+ verification_options: pool::verifier::Options {
+ // TODO [ToDr] This should probably be based on some real values?
+ minimal_gas_price: 0.into(),
+ block_gas_limit: 8_000_000.into(),
+ tx_gas_limit: U256::max_value(),
+ no_early_reject: false,
+ },
}
}
}
@@ -78,66 +155,43 @@ impl Default for VerificationStore {
impl VerificationStore {
/// Adds private transaction for verification into the store
pub fn add_transaction(
- &mut self,
+ &self,
transaction: UnverifiedTransaction,
- contract: Address,
- validator_account: Address,
- private_hash: H256,
+ validator_account: Option,
+ private_transaction: PrivateTransaction,
client: C,
) -> Result<(), Error> {
- if self.descriptors.len() > MAX_QUEUE_LEN {
- bail!(ErrorKind::QueueIsFull);
- }
- let transaction_hash = transaction.hash();
- if self.descriptors.get(&transaction_hash).is_some() {
- bail!(ErrorKind::PrivateTransactionAlreadyImported);
- }
-
- let results = self.transactions.import(
- client,
- vec![pool::verifier::Transaction::Unverified(transaction)],
- );
-
- // Verify that transaction was imported
- results.into_iter()
- .next()
- .expect("One transaction inserted; one result returned; qed")?;
-
- self.descriptors.insert(transaction_hash, PrivateTransactionDesc {
- private_hash,
- contract,
+ let options = self.verification_options.clone();
+ // Use pool's verifying pipeline for original transaction's verification
+ let verifier = pool::verifier::Verifier::new(client, options, Default::default(), None);
+ let unverified = pool::verifier::Transaction::Unverified(transaction);
+ let verified_tx = verifier.verify_transaction(unverified)?;
+ let signed_tx: SignedTransaction = verified_tx.signed().clone();
+ let signed_hash = signed_tx.hash();
+ let signed_sender = signed_tx.sender();
+ let verified = VerifiedPrivateTransaction {
+ private_transaction,
validator_account,
- });
-
+ transaction: signed_tx,
+ transaction_hash: signed_hash,
+ transaction_sender: signed_sender,
+ };
+ let mut pool = self.verification_pool.write();
+ pool.import(verified)?;
Ok(())
}
- /// Returns transactions ready for verification
+ /// Drains transactions ready for verification from the pool
/// Returns only one transaction per sender because several cannot be verified in a row without verification from other peers
- pub fn ready_transactions(&self, client: C) -> Vec> {
- // We never store PendingTransactions and we don't use internal cache,
- // so we don't need to provide real block number of timestamp here
- let block_number = 0;
- let timestamp = 0;
- let nonce_cap = None;
-
- self.transactions.collect_pending(client, block_number, timestamp, nonce_cap, |transactions| {
- // take only one transaction per sender
- let mut senders = HashSet::with_capacity(self.descriptors.len());
- transactions.filter(move |tx| senders.insert(tx.signed().sender())).collect()
- })
- }
-
- /// Returns descriptor of the corresponding private transaction
- pub fn private_transaction_descriptor(&self, transaction_hash: &H256) -> Result<&PrivateTransactionDesc, Error> {
- self.descriptors.get(transaction_hash).ok_or(ErrorKind::PrivateTransactionNotFound.into())
- }
-
- /// Remove transaction from the queue for verification
- pub fn remove_private_transaction(&mut self, transaction_hash: &H256) {
- self.descriptors.remove(transaction_hash);
- self.transactions.remove(&[*transaction_hash], true);
+ pub fn drain(&self, client: C) -> Vec> {
+ let ready = PrivateReadyState::new(client);
+ let transactions: Vec<_> = self.verification_pool.read().pending(ready).collect();
+ let mut pool = self.verification_pool.write();
+ for tx in &transactions {
+ pool.remove(tx.hash(), true);
+ }
+ transactions
}
}
diff --git a/ethcore/service/Cargo.toml b/ethcore/service/Cargo.toml
index e1ba1c81f..ce445f6d9 100644
--- a/ethcore/service/Cargo.toml
+++ b/ethcore/service/Cargo.toml
@@ -10,6 +10,7 @@ ethcore = { path = ".." }
ethcore-io = { path = "../../util/io" }
ethcore-private-tx = { path = "../private-tx" }
ethcore-sync = { path = "../sync" }
+ethereum-types = "0.3"
kvdb = { git = "https://github.com/paritytech/parity-common" }
log = "0.4"
stop-guard = { path = "../../util/stop-guard" }
diff --git a/ethcore/service/src/lib.rs b/ethcore/service/src/lib.rs
index d85a377cd..7ded2af79 100644
--- a/ethcore/service/src/lib.rs
+++ b/ethcore/service/src/lib.rs
@@ -19,6 +19,7 @@ extern crate ethcore;
extern crate ethcore_io as io;
extern crate ethcore_private_tx;
extern crate ethcore_sync as sync;
+extern crate ethereum_types;
extern crate kvdb;
extern crate stop_guard;
diff --git a/ethcore/service/src/service.rs b/ethcore/service/src/service.rs
index 1ffb0d621..a6bbc4e10 100644
--- a/ethcore/service/src/service.rs
+++ b/ethcore/service/src/service.rs
@@ -21,6 +21,7 @@ use std::path::Path;
use std::time::Duration;
use ansi_term::Colour;
+use ethereum_types::H256;
use io::{IoContext, TimerToken, IoHandler, IoService, IoError};
use stop_guard::StopGuard;
@@ -54,12 +55,24 @@ impl PrivateTxService {
}
impl PrivateTxHandler for PrivateTxService {
- fn import_private_transaction(&self, rlp: &[u8]) -> Result<(), String> {
- self.provider.import_private_transaction(rlp).map_err(|e| e.to_string())
+ fn import_private_transaction(&self, rlp: &[u8]) -> Result {
+ match self.provider.import_private_transaction(rlp) {
+ Ok(import_result) => Ok(import_result),
+ Err(err) => {
+ warn!(target: "privatetx", "Unable to import private transaction packet: {}", err);
+ bail!(err.to_string())
+ }
+ }
}
- fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result<(), String> {
- self.provider.import_signed_private_transaction(rlp).map_err(|e| e.to_string())
+ fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result {
+ match self.provider.import_signed_private_transaction(rlp) {
+ Ok(import_result) => Ok(import_result),
+ Err(err) => {
+ warn!(target: "privatetx", "Unable to import signed private transaction packet: {}", err);
+ bail!(err.to_string())
+ }
+ }
}
}
diff --git a/ethcore/src/client/chain_notify.rs b/ethcore/src/client/chain_notify.rs
index 62de03591..ebfe7bdef 100644
--- a/ethcore/src/client/chain_notify.rs
+++ b/ethcore/src/client/chain_notify.rs
@@ -26,9 +26,9 @@ pub enum ChainMessageType {
/// Consensus message
Consensus(Vec),
/// Message with private transaction
- PrivateTransaction(Vec),
+ PrivateTransaction(H256, Vec),
/// Message with signed private transaction
- SignedPrivateTransaction(Vec),
+ SignedPrivateTransaction(H256, Vec),
}
/// Route type to indicate whether it is enacted or retracted.
diff --git a/ethcore/src/test_helpers.rs b/ethcore/src/test_helpers.rs
index c873db5d1..06cfe7030 100644
--- a/ethcore/src/test_helpers.rs
+++ b/ethcore/src/test_helpers.rs
@@ -490,8 +490,8 @@ impl ChainNotify for TestNotify {
fn broadcast(&self, message: ChainMessageType) {
let data = match message {
ChainMessageType::Consensus(data) => data,
- ChainMessageType::SignedPrivateTransaction(data) => data,
- ChainMessageType::PrivateTransaction(data) => data,
+ ChainMessageType::SignedPrivateTransaction(_, data) => data,
+ ChainMessageType::PrivateTransaction(_, data) => data,
};
self.messages.write().push(data);
}
diff --git a/ethcore/sync/src/api.rs b/ethcore/sync/src/api.rs
index 606aa39b3..5128df3b2 100644
--- a/ethcore/sync/src/api.rs
+++ b/ethcore/sync/src/api.rs
@@ -38,7 +38,8 @@ use std::net::{SocketAddr, AddrParseError};
use std::str::FromStr;
use parking_lot::RwLock;
use chain::{ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_62,
- PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3};
+ PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3,
+ PRIVATE_TRANSACTION_PACKET, SIGNED_PRIVATE_TRANSACTION_PACKET};
use light::client::AsLightClient;
use light::Provider;
use light::net::{
@@ -522,8 +523,10 @@ impl ChainNotify for EthSync {
let mut sync_io = NetSyncIo::new(context, &*self.eth_handler.chain, &*self.eth_handler.snapshot_service, &self.eth_handler.overlay);
match message_type {
ChainMessageType::Consensus(message) => self.eth_handler.sync.write().propagate_consensus_packet(&mut sync_io, message),
- ChainMessageType::PrivateTransaction(message) => self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, message),
- ChainMessageType::SignedPrivateTransaction(message) => self.eth_handler.sync.write().propagate_signed_private_transaction(&mut sync_io, message),
+ ChainMessageType::PrivateTransaction(transaction_hash, message) =>
+ self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, PRIVATE_TRANSACTION_PACKET, message),
+ ChainMessageType::SignedPrivateTransaction(transaction_hash, message) =>
+ self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, SIGNED_PRIVATE_TRANSACTION_PACKET, message),
}
});
}
diff --git a/ethcore/sync/src/chain/handler.rs b/ethcore/sync/src/chain/handler.rs
index 8547be7b3..c30c60a7c 100644
--- a/ethcore/sync/src/chain/handler.rs
+++ b/ethcore/sync/src/chain/handler.rs
@@ -552,6 +552,7 @@ impl SyncHandler {
asking_hash: None,
ask_time: Instant::now(),
last_sent_transactions: HashSet::new(),
+ last_sent_private_transactions: HashSet::new(),
expired: false,
confirmation: if sync.fork_block.is_none() { ForkConfirmation::Confirmed } else { ForkConfirmation::Unconfirmed },
asking_snapshot_data: None,
@@ -631,21 +632,29 @@ impl SyncHandler {
}
/// Called when peer sends us signed private transaction packet
- fn on_signed_private_transaction(sync: &ChainSync, _io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
+ fn on_signed_private_transaction(sync: &mut ChainSync, _io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
trace!(target: "sync", "{} Ignoring packet from unconfirmed/unknown peer", peer_id);
return Ok(());
}
trace!(target: "sync", "Received signed private transaction packet from {:?}", peer_id);
- if let Err(e) = sync.private_tx_handler.import_signed_private_transaction(r.as_raw()) {
- trace!(target: "sync", "Ignoring the message, error queueing: {}", e);
- }
+ match sync.private_tx_handler.import_signed_private_transaction(r.as_raw()) {
+ Ok(transaction_hash) => {
+ //don't send the packet back
+ if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
+ peer.last_sent_private_transactions.insert(transaction_hash);
+ }
+ },
+ Err(e) => {
+ trace!(target: "sync", "Ignoring the message, error queueing: {}", e);
+ }
+ }
Ok(())
}
/// Called when peer sends us new private transaction packet
- fn on_private_transaction(sync: &ChainSync, _io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
+ fn on_private_transaction(sync: &mut ChainSync, _io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
trace!(target: "sync", "{} Ignoring packet from unconfirmed/unknown peer", peer_id);
return Ok(());
@@ -653,9 +662,17 @@ impl SyncHandler {
trace!(target: "sync", "Received private transaction packet from {:?}", peer_id);
- if let Err(e) = sync.private_tx_handler.import_private_transaction(r.as_raw()) {
- trace!(target: "sync", "Ignoring the message, error queueing: {}", e);
- }
+ match sync.private_tx_handler.import_private_transaction(r.as_raw()) {
+ Ok(transaction_hash) => {
+ //don't send the packet back
+ if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
+ peer.last_sent_private_transactions.insert(transaction_hash);
+ }
+ },
+ Err(e) => {
+ trace!(target: "sync", "Ignoring the message, error queueing: {}", e);
+ }
+ }
Ok(())
}
}
diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs
index b53c38b43..06bd3feba 100644
--- a/ethcore/sync/src/chain/mod.rs
+++ b/ethcore/sync/src/chain/mod.rs
@@ -173,8 +173,8 @@ pub const SNAPSHOT_MANIFEST_PACKET: u8 = 0x12;
pub const GET_SNAPSHOT_DATA_PACKET: u8 = 0x13;
pub const SNAPSHOT_DATA_PACKET: u8 = 0x14;
pub const CONSENSUS_DATA_PACKET: u8 = 0x15;
-const PRIVATE_TRANSACTION_PACKET: u8 = 0x16;
-const SIGNED_PRIVATE_TRANSACTION_PACKET: u8 = 0x17;
+pub const PRIVATE_TRANSACTION_PACKET: u8 = 0x16;
+pub const SIGNED_PRIVATE_TRANSACTION_PACKET: u8 = 0x17;
const MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD: usize = 3;
@@ -324,6 +324,8 @@ pub struct PeerInfo {
ask_time: Instant,
/// Holds a set of transactions recently sent to this peer to avoid spamming.
last_sent_transactions: HashSet,
+ /// Holds a set of private transactions and their signatures recently sent to this peer to avoid spamming.
+ last_sent_private_transactions: HashSet,
/// Pending request is expired and result should be ignored
expired: bool,
/// Peer fork confirmation status
@@ -353,6 +355,10 @@ impl PeerInfo {
self.expired = true;
}
}
+
+ fn reset_private_stats(&mut self) {
+ self.last_sent_private_transactions.clear();
+ }
}
#[cfg(not(test))]
@@ -1056,8 +1062,15 @@ impl ChainSync {
self.peers.iter().filter_map(|(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_2.0 { Some(*id) } else { None }).collect()
}
- fn get_private_transaction_peers(&self) -> Vec {
- self.peers.iter().filter_map(|(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_3.0 { Some(*id) } else { None }).collect()
+ fn get_private_transaction_peers(&self, transaction_hash: &H256) -> Vec {
+ self.peers.iter().filter_map(
+ |(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_3.0
+ && !p.last_sent_private_transactions.contains(transaction_hash) {
+ Some(*id)
+ } else {
+ None
+ }
+ ).collect()
}
/// Maintain other peers. Send out any new blocks and transactions
@@ -1085,8 +1098,10 @@ impl ChainSync {
// Select random peer to re-broadcast transactions to.
let peer = random::new().gen_range(0, self.peers.len());
trace!(target: "sync", "Re-broadcasting transactions to a random peer.");
- self.peers.values_mut().nth(peer).map(|peer_info|
- peer_info.last_sent_transactions.clear()
+ self.peers.values_mut().nth(peer).map(|peer_info| {
+ peer_info.last_sent_transactions.clear();
+ peer_info.reset_private_stats()
+ }
);
}
}
@@ -1127,13 +1142,8 @@ impl ChainSync {
}
/// Broadcast private transaction message to peers.
- pub fn propagate_private_transaction(&mut self, io: &mut SyncIo, packet: Bytes) {
- SyncPropagator::propagate_private_transaction(self, io, packet);
- }
-
- /// Broadcast signed private transaction message to peers.
- pub fn propagate_signed_private_transaction(&mut self, io: &mut SyncIo, packet: Bytes) {
- SyncPropagator::propagate_signed_private_transaction(self, io, packet);
+ pub fn propagate_private_transaction(&mut self, io: &mut SyncIo, transaction_hash: H256, packet_id: PacketId, packet: Bytes) {
+ SyncPropagator::propagate_private_transaction(self, io, transaction_hash, packet_id, packet);
}
}
@@ -1256,6 +1266,7 @@ pub mod tests {
asking_hash: None,
ask_time: Instant::now(),
last_sent_transactions: HashSet::new(),
+ last_sent_private_transactions: HashSet::new(),
expired: false,
confirmation: super::ForkConfirmation::Confirmed,
snapshot_number: None,
diff --git a/ethcore/sync/src/chain/propagator.rs b/ethcore/sync/src/chain/propagator.rs
index aabe90c93..102a31712 100644
--- a/ethcore/sync/src/chain/propagator.rs
+++ b/ethcore/sync/src/chain/propagator.rs
@@ -36,8 +36,6 @@ use super::{
CONSENSUS_DATA_PACKET,
NEW_BLOCK_HASHES_PACKET,
NEW_BLOCK_PACKET,
- PRIVATE_TRANSACTION_PACKET,
- SIGNED_PRIVATE_TRANSACTION_PACKET,
TRANSACTIONS_PACKET,
};
@@ -293,20 +291,14 @@ impl SyncPropagator {
}
/// Broadcast private transaction message to peers.
- pub fn propagate_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, packet: Bytes) {
- let lucky_peers = ChainSync::select_random_peers(&sync.get_private_transaction_peers());
+ pub fn propagate_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, transaction_hash: H256, packet_id: PacketId, packet: Bytes) {
+ let lucky_peers = ChainSync::select_random_peers(&sync.get_private_transaction_peers(&transaction_hash));
trace!(target: "sync", "Sending private transaction packet to {:?}", lucky_peers);
for peer_id in lucky_peers {
- SyncPropagator::send_packet(io, peer_id, PRIVATE_TRANSACTION_PACKET, packet.clone());
- }
- }
-
- /// Broadcast signed private transaction message to peers.
- pub fn propagate_signed_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, packet: Bytes) {
- let lucky_peers = ChainSync::select_random_peers(&sync.get_private_transaction_peers());
- trace!(target: "sync", "Sending signed private transaction packet to {:?}", lucky_peers);
- for peer_id in lucky_peers {
- SyncPropagator::send_packet(io, peer_id, SIGNED_PRIVATE_TRANSACTION_PACKET, packet.clone());
+ if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
+ peer.last_sent_private_transactions.insert(transaction_hash);
+ }
+ SyncPropagator::send_packet(io, peer_id, packet_id, packet.clone());
}
}
@@ -428,6 +420,7 @@ mod tests {
asking_hash: None,
ask_time: Instant::now(),
last_sent_transactions: HashSet::new(),
+ last_sent_private_transactions: HashSet::new(),
expired: false,
confirmation: ForkConfirmation::Confirmed,
snapshot_number: None,
diff --git a/ethcore/sync/src/private_tx.rs b/ethcore/sync/src/private_tx.rs
index d7434c8bd..03928c22d 100644
--- a/ethcore/sync/src/private_tx.rs
+++ b/ethcore/sync/src/private_tx.rs
@@ -15,26 +15,29 @@
// along with Parity. If not, see .
use parking_lot::Mutex;
+use ethereum_types::H256;
/// Trait which should be implemented by a private transaction handler.
pub trait PrivateTxHandler: Send + Sync + 'static {
/// Function called on new private transaction received.
- fn import_private_transaction(&self, rlp: &[u8]) -> Result<(), String>;
+ /// Returns the hash of the imported transaction
+ fn import_private_transaction(&self, rlp: &[u8]) -> Result;
/// Function called on new signed private transaction received.
- fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result<(), String>;
+ /// Returns the hash of the imported transaction
+ fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result;
}
/// Nonoperative private transaction handler.
pub struct NoopPrivateTxHandler;
impl PrivateTxHandler for NoopPrivateTxHandler {
- fn import_private_transaction(&self, _rlp: &[u8]) -> Result<(), String> {
- Ok(())
+ fn import_private_transaction(&self, _rlp: &[u8]) -> Result {
+ Ok(H256::default())
}
- fn import_signed_private_transaction(&self, _rlp: &[u8]) -> Result<(), String> {
- Ok(())
+ fn import_signed_private_transaction(&self, _rlp: &[u8]) -> Result {
+ Ok(H256::default())
}
}
@@ -48,13 +51,13 @@ pub struct SimplePrivateTxHandler {
}
impl PrivateTxHandler for SimplePrivateTxHandler {
- fn import_private_transaction(&self, rlp: &[u8]) -> Result<(), String> {
+ fn import_private_transaction(&self, rlp: &[u8]) -> Result {
self.txs.lock().push(rlp.to_vec());
- Ok(())
+ Ok(H256::default())
}
- fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result<(), String> {
+ fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result {
self.signed_txs.lock().push(rlp.to_vec());
- Ok(())
+ Ok(H256::default())
}
}
diff --git a/ethcore/sync/src/tests/helpers.rs b/ethcore/sync/src/tests/helpers.rs
index 59db57dc5..d75d71ea9 100644
--- a/ethcore/sync/src/tests/helpers.rs
+++ b/ethcore/sync/src/tests/helpers.rs
@@ -33,7 +33,7 @@ use ethcore::test_helpers;
use sync_io::SyncIo;
use io::{IoChannel, IoContext, IoHandler};
use api::WARP_SYNC_PROTOCOL_ID;
-use chain::{ChainSync, ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_3};
+use chain::{ChainSync, ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_3, PRIVATE_TRANSACTION_PACKET, SIGNED_PRIVATE_TRANSACTION_PACKET};
use SyncConfig;
use private_tx::SimplePrivateTxHandler;
@@ -230,8 +230,10 @@ impl EthPeer where C: FlushingBlockChainClient {
let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None);
match message {
ChainMessageType::Consensus(data) => self.sync.write().propagate_consensus_packet(&mut io, data),
- ChainMessageType::PrivateTransaction(data) => self.sync.write().propagate_private_transaction(&mut io, data),
- ChainMessageType::SignedPrivateTransaction(data) => self.sync.write().propagate_signed_private_transaction(&mut io, data),
+ ChainMessageType::PrivateTransaction(transaction_hash, data) =>
+ self.sync.write().propagate_private_transaction(&mut io, transaction_hash, PRIVATE_TRANSACTION_PACKET, data),
+ ChainMessageType::SignedPrivateTransaction(transaction_hash, data) =>
+ self.sync.write().propagate_private_transaction(&mut io, transaction_hash, SIGNED_PRIVATE_TRANSACTION_PACKET, data),
}
}
diff --git a/ethcore/sync/src/tests/private.rs b/ethcore/sync/src/tests/private.rs
index 04b414b94..9b39aed76 100644
--- a/ethcore/sync/src/tests/private.rs
+++ b/ethcore/sync/src/tests/private.rs
@@ -24,11 +24,12 @@ use ethcore::CreateContractAddress;
use transaction::{Transaction, Action};
use ethcore::executive::{contract_address};
use ethcore::test_helpers::{push_block_with_transactions};
-use ethcore_private_tx::{Provider, ProviderConfig, NoopEncryptor, Importer};
+use ethcore_private_tx::{Provider, ProviderConfig, NoopEncryptor, Importer, SignedPrivateTransaction};
use ethcore::account_provider::AccountProvider;
use ethkey::{KeyPair};
use tests::helpers::{TestNet, TestIoHandler};
use rustc_hex::FromHex;
+use rlp::Rlp;
use SyncConfig;
fn seal_spec() -> Spec {
@@ -144,6 +145,8 @@ fn send_private_transaction() {
//process signed response
let signed_private_transaction = received_signed_private_transactions[0].clone();
assert!(pm0.import_signed_private_transaction(&signed_private_transaction).is_ok());
+ let signature: SignedPrivateTransaction = Rlp::new(&signed_private_transaction).as_val().unwrap();
+ assert!(pm0.process_signature(&signature).is_ok());
let local_transactions = net.peer(0).miner.local_transactions();
assert_eq!(local_transactions.len(), 1);
}
diff --git a/miner/src/pool/local_transactions.rs b/miner/src/pool/local_transactions.rs
index a1c69ef22..a71d9244c 100644
--- a/miner/src/pool/local_transactions.rs
+++ b/miner/src/pool/local_transactions.rs
@@ -20,7 +20,7 @@ use std::{fmt, sync::Arc};
use ethereum_types::H256;
use linked_hash_map::LinkedHashMap;
-use pool::VerifiedTransaction as Transaction;
+use pool::{VerifiedTransaction as Transaction, ScoredTransaction};
use txpool::{self, VerifiedTransaction};
/// Status of local transaction.
diff --git a/miner/src/pool/mod.rs b/miner/src/pool/mod.rs
index 4a1223226..ccfbba7f8 100644
--- a/miner/src/pool/mod.rs
+++ b/miner/src/pool/mod.rs
@@ -24,10 +24,10 @@ use txpool;
mod listener;
mod queue;
mod ready;
-mod scoring;
pub mod client;
pub mod local_transactions;
+pub mod scoring;
pub mod verifier;
#[cfg(test)]
@@ -84,7 +84,7 @@ impl PendingSettings {
/// Transaction priority.
#[derive(Debug, PartialEq, Eq, PartialOrd, Clone, Copy)]
-pub(crate) enum Priority {
+pub enum Priority {
/// Regular transactions received over the network. (no priority boost)
Regular,
/// Transactions from retracted blocks (medium priority)
@@ -108,6 +108,18 @@ impl Priority {
}
}
+/// Scoring properties for verified transaction.
+pub trait ScoredTransaction {
+ /// Gets transaction priority.
+ fn priority(&self) -> Priority;
+
+ /// Gets transaction gas price.
+ fn gas_price(&self) -> &U256;
+
+ /// Gets transaction nonce.
+ fn nonce(&self) -> U256;
+}
+
/// Verified transaction stored in the pool.
#[derive(Debug, PartialEq, Eq)]
pub struct VerifiedTransaction {
@@ -137,11 +149,6 @@ impl VerifiedTransaction {
}
}
- /// Gets transaction priority.
- pub(crate) fn priority(&self) -> Priority {
- self.priority
- }
-
/// Gets transaction insertion id.
pub(crate) fn insertion_id(&self) -> usize {
self.insertion_id
@@ -175,3 +182,19 @@ impl txpool::VerifiedTransaction for VerifiedTransaction {
&self.sender
}
}
+
+impl ScoredTransaction for VerifiedTransaction {
+ fn priority(&self) -> Priority {
+ self.priority
+ }
+
+ /// Gets transaction gas price.
+ fn gas_price(&self) -> &U256 {
+ &self.transaction.gas_price
+ }
+
+ /// Gets transaction nonce.
+ fn nonce(&self) -> U256 {
+ self.transaction.nonce
+ }
+}
diff --git a/miner/src/pool/scoring.rs b/miner/src/pool/scoring.rs
index dbe3c08f4..61fcf4e41 100644
--- a/miner/src/pool/scoring.rs
+++ b/miner/src/pool/scoring.rs
@@ -31,7 +31,7 @@ use std::cmp;
use ethereum_types::U256;
use txpool::{self, scoring};
-use super::{verifier, PrioritizationStrategy, VerifiedTransaction};
+use super::{verifier, PrioritizationStrategy, VerifiedTransaction, ScoredTransaction};
/// Transaction with the same (sender, nonce) can be replaced only if
/// `new_gas_price > old_gas_price + old_gas_price >> SHIFT`
@@ -67,23 +67,23 @@ impl NonceAndGasPrice {
}
}
-impl txpool::Scoring for NonceAndGasPrice {
+impl txpool::Scoring
for NonceAndGasPrice where P: ScoredTransaction + txpool::VerifiedTransaction {
type Score = U256;
type Event = ();
- fn compare(&self, old: &VerifiedTransaction, other: &VerifiedTransaction) -> cmp::Ordering {
- old.transaction.nonce.cmp(&other.transaction.nonce)
+ fn compare(&self, old: &P, other: &P) -> cmp::Ordering {
+ old.nonce().cmp(&other.nonce())
}
- fn choose(&self, old: &VerifiedTransaction, new: &VerifiedTransaction) -> scoring::Choice {
- if old.transaction.nonce != new.transaction.nonce {
+ fn choose(&self, old: &P, new: &P) -> scoring::Choice {
+ if old.nonce() != new.nonce() {
return scoring::Choice::InsertNew
}
- let old_gp = old.transaction.gas_price;
- let new_gp = new.transaction.gas_price;
+ let old_gp = old.gas_price();
+ let new_gp = new.gas_price();
- let min_required_gp = bump_gas_price(old_gp);
+ let min_required_gp = bump_gas_price(*old_gp);
match min_required_gp.cmp(&new_gp) {
cmp::Ordering::Greater => scoring::Choice::RejectNew,
@@ -91,7 +91,7 @@ impl txpool::Scoring for NonceAndGasPrice {
}
}
- fn update_scores(&self, txs: &[txpool::Transaction], scores: &mut [U256], change: scoring::Change) {
+ fn update_scores(&self, txs: &[txpool::Transaction], scores: &mut [U256], change: scoring::Change) {
use self::scoring::Change;
match change {
@@ -101,7 +101,7 @@ impl txpool::Scoring for NonceAndGasPrice {
assert!(i < txs.len());
assert!(i < scores.len());
- scores[i] = txs[i].transaction.transaction.gas_price;
+ scores[i] = *txs[i].transaction.gas_price();
let boost = match txs[i].priority() {
super::Priority::Local => 15,
super::Priority::Retracted => 10,
@@ -122,10 +122,10 @@ impl txpool::Scoring for NonceAndGasPrice {
}
}
- fn should_replace(&self, old: &VerifiedTransaction, new: &VerifiedTransaction) -> scoring::Choice {
- if old.sender == new.sender {
+ fn should_replace(&self, old: &P, new: &P) -> scoring::Choice {
+ if old.sender() == new.sender() {
// prefer earliest transaction
- match new.transaction.nonce.cmp(&old.transaction.nonce) {
+ match new.nonce().cmp(&old.nonce()) {
cmp::Ordering::Less => scoring::Choice::ReplaceOld,
cmp::Ordering::Greater => scoring::Choice::RejectNew,
cmp::Ordering::Equal => self.choose(old, new),
@@ -134,8 +134,8 @@ impl txpool::Scoring for NonceAndGasPrice {
// accept local transactions over the limit
scoring::Choice::InsertNew
} else {
- let old_score = (old.priority(), old.transaction.gas_price);
- let new_score = (new.priority(), new.transaction.gas_price);
+ let old_score = (old.priority(), old.gas_price());
+ let new_score = (new.priority(), new.gas_price());
if new_score > old_score {
scoring::Choice::ReplaceOld
} else {
@@ -144,7 +144,7 @@ impl txpool::Scoring for NonceAndGasPrice {
}
}
- fn should_ignore_sender_limit(&self, new: &VerifiedTransaction) -> bool {
+ fn should_ignore_sender_limit(&self, new: &P) -> bool {
new.priority().is_local()
}
}
@@ -185,12 +185,8 @@ mod tests {
};
let keypair = Random.generate().unwrap();
- let txs = vec![tx1, tx2, tx3, tx4].into_iter().enumerate().map(|(i, tx)| {
- let verified = tx.unsigned().sign(keypair.secret(), None).verified();
- txpool::Transaction {
- insertion_id: i as u64,
- transaction: Arc::new(verified),
- }
+ let txs = vec![tx1, tx2, tx3, tx4].into_iter().map(|tx| {
+ tx.unsigned().sign(keypair.secret(), None).verified()
}).collect::>();
assert_eq!(scoring.should_replace(&txs[0], &txs[1]), RejectNew);
@@ -213,11 +209,7 @@ mod tests {
gas_price: 1,
..Default::default()
};
- let verified_tx = tx.signed().verified();
- txpool::Transaction {
- insertion_id: 0,
- transaction: Arc::new(verified_tx),
- }
+ tx.signed().verified()
};
let tx_regular_high_gas = {
let tx = Tx {
@@ -225,11 +217,7 @@ mod tests {
gas_price: 10,
..Default::default()
};
- let verified_tx = tx.signed().verified();
- txpool::Transaction {
- insertion_id: 1,
- transaction: Arc::new(verified_tx),
- }
+ tx.signed().verified()
};
let tx_local_low_gas = {
let tx = Tx {
@@ -239,10 +227,7 @@ mod tests {
};
let mut verified_tx = tx.signed().verified();
verified_tx.priority = ::pool::Priority::Local;
- txpool::Transaction {
- insertion_id: 2,
- transaction: Arc::new(verified_tx),
- }
+ verified_tx
};
let tx_local_high_gas = {
let tx = Tx {
@@ -252,10 +237,7 @@ mod tests {
};
let mut verified_tx = tx.signed().verified();
verified_tx.priority = ::pool::Priority::Local;
- txpool::Transaction {
- insertion_id: 3,
- transaction: Arc::new(verified_tx),
- }
+ verified_tx
};
assert_eq!(scoring.should_replace(&tx_regular_low_gas, &tx_regular_high_gas), ReplaceOld);