diff --git a/Cargo.lock b/Cargo.lock index be413453a..152da9223 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1230,6 +1230,7 @@ dependencies = [ "ethabi-derive 8.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "ethcore 1.12.0", "ethcore-call-contract 0.1.0", + "ethcore-db 0.1.0", "ethcore-io 1.12.0", "ethcore-miner 1.12.0", "ethereum-types 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1237,7 +1238,11 @@ dependencies = [ "ethkey 0.3.0", "fetch 0.1.0", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", + "hash-db 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", + "journaldb 0.2.0", "keccak-hash 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "keccak-hasher 0.1.1", + "kvdb 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "machine 0.1.0", "parity-bytes 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/ethcore/db/src/db.rs b/ethcore/db/src/db.rs index d451a7e1e..8f5390639 100644 --- a/ethcore/db/src/db.rs +++ b/ethcore/db/src/db.rs @@ -41,8 +41,10 @@ pub const COL_ACCOUNT_BLOOM: Option = Some(5); pub const COL_NODE_INFO: Option = Some(6); /// Column for the light client chain. pub const COL_LIGHT_CHAIN: Option = Some(7); +/// Column for the private transactions state. +pub const COL_PRIVATE_TRANSACTIONS_STATE: Option = Some(8); /// Number of columns in DB -pub const NUM_COLUMNS: Option = Some(8); +pub const NUM_COLUMNS: Option = Some(9); /// Modes for updating caches. #[derive(Clone, Copy)] diff --git a/ethcore/private-tx/Cargo.toml b/ethcore/private-tx/Cargo.toml index c0050bbb2..2c793b042 100644 --- a/ethcore/private-tx/Cargo.toml +++ b/ethcore/private-tx/Cargo.toml @@ -14,6 +14,7 @@ ethabi = "8.0" ethabi-contract = "8.0" ethabi-derive = "8.0" ethcore = { path = ".." } +ethcore-db = { path = "../db" } ethcore-call-contract = { path = "../call-contract" } ethcore-io = { path = "../../util/io" } ethcore-miner = { path = "../../miner" } @@ -23,9 +24,13 @@ ethkey = { path = "../../accounts/ethkey" } fetch = { path = "../../util/fetch" } futures = "0.1" parity-util-mem = "0.2.0" +hash-db = "0.15.0" keccak-hash = "0.2.0" +keccak-hasher = { path = "../../util/keccak-hasher" } +kvdb = "0.1" log = "0.4" machine = { path = "../machine" } +journaldb = { path = "../../util/journaldb" } parity-bytes = "0.1" parity-crypto = "0.4.0" parking_lot = "0.8" diff --git a/ethcore/private-tx/src/error.rs b/ethcore/private-tx/src/error.rs index 4d26599a5..dada20924 100644 --- a/ethcore/private-tx/src/error.rs +++ b/ethcore/private-tx/src/error.rs @@ -99,6 +99,12 @@ pub enum Error { /// Account for signing requests to key server not set. #[display(fmt = "Account for signing requests to key server not set.")] KeyServerAccountNotSet, + /// Private state for the contract was not found in the local storage. + #[display(fmt = "Private state for the contract was not found in the local storage.")] + PrivateStateNotFound, + /// Cannot write state to the local database. + #[display(fmt = "Cannot write state to the local database.")] + DatabaseWriteError, /// Encryption key is not found on key server. #[display(fmt = "Encryption key is not found on key server for {}", _0)] EncryptionKeyNotFound(Address), diff --git a/ethcore/private-tx/src/lib.rs b/ethcore/private-tx/src/lib.rs index 283df9b7a..087f85b63 100644 --- a/ethcore/private-tx/src/lib.rs +++ b/ethcore/private-tx/src/lib.rs @@ -22,6 +22,8 @@ mod private_transactions; mod messages; mod error; mod log; +mod state_store; +mod private_state_db; extern crate account_state; extern crate client_traits; @@ -29,6 +31,7 @@ extern crate common_types as types; extern crate ethabi; extern crate ethcore; extern crate ethcore_call_contract as call_contract; +extern crate ethcore_db; extern crate ethcore_io as io; extern crate ethcore_miner; extern crate ethereum_types; @@ -37,8 +40,12 @@ extern crate ethkey; extern crate fetch; extern crate futures; extern crate parity_util_mem; +extern crate hash_db; extern crate keccak_hash as hash; +extern crate keccak_hasher; +extern crate kvdb; extern crate machine; +extern crate journaldb; extern crate parity_bytes as bytes; extern crate parity_crypto as crypto; extern crate parking_lot; @@ -77,18 +84,21 @@ pub use encryptor::{Encryptor, SecretStoreEncryptor, EncryptorConfig, NoopEncryp pub use key_server_keys::{KeyProvider, SecretStoreKeys, StoringKeyProvider}; pub use private_transactions::{VerifiedPrivateTransaction, VerificationStore, PrivateTransactionSigningDesc, SigningStore}; pub use messages::{PrivateTransaction, SignedPrivateTransaction}; +pub use private_state_db::PrivateStateDB; pub use error::Error; pub use log::{Logging, TransactionLog, ValidatorLog, PrivateTxStatus, FileLogsSerializer}; +use state_store::{PrivateStateStorage, RequestType}; use std::sync::{Arc, Weak}; use std::collections::{HashMap, HashSet, BTreeMap}; +use std::time::Duration; use ethereum_types::{H128, H256, U256, Address, BigEndianHash}; use hash::keccak; use rlp::*; use parking_lot::RwLock; use bytes::Bytes; use ethkey::{Signature, recover, public_to_address}; -use io::IoChannel; +use io::{IoChannel, IoHandler, IoContext, TimerToken}; use machine::{ executive::{Executive, TransactOptions, contract_address as ethcore_contract_address}, executed::Executed as FlatExecuted, @@ -107,6 +117,7 @@ use state_db::StateDB; use account_state::State; use trace::{Tracer, VMTracer}; use call_contract::CallContract; +use kvdb::KeyValueDB; use rustc_hex::FromHex; use ethabi::FunctionOutputDecoder; use vm::CreateContractAddress; @@ -128,6 +139,12 @@ const INITIAL_PRIVATE_CONTRACT_VER: usize = 1; /// Version for the private contract notification about private state changes added const PRIVATE_CONTRACT_WITH_NOTIFICATION_VER: usize = 2; +/// Timer for private state retrieval +const STATE_RETRIEVAL_TIMER: TimerToken = 0; + +/// Timer for private state retrieval, 5 secs duration +const STATE_RETRIEVAL_TICK: Duration = Duration::from_secs(5); + /// Configurtion for private transaction provider #[derive(Default, PartialEq, Debug, Clone)] pub struct ProviderConfig { @@ -137,6 +154,8 @@ pub struct ProviderConfig { pub signer_account: Option
, /// Path to private tx logs pub logs_path: Option, + /// Provider should store the state of the private contract offchain (in DB) + pub use_offchain_storage: bool, } #[derive(Debug)] @@ -198,6 +217,8 @@ pub struct Provider { channel: IoChannel, keys_provider: Arc, logging: Option, + use_offchain_storage: bool, + state_storage: PrivateStateStorage, } #[derive(Debug)] @@ -218,6 +239,7 @@ impl Provider { config: ProviderConfig, channel: IoChannel, keys_provider: Arc, + db: Arc, ) -> Self { keys_provider.update_acl_contract(); Provider { @@ -233,9 +255,16 @@ impl Provider { channel, keys_provider, logging: config.logs_path.map(|path| Logging::new(Arc::new(FileLogsSerializer::with_path(path)))), + use_offchain_storage: config.use_offchain_storage, + state_storage: PrivateStateStorage::new(db), } } + /// Returns private state DB + pub fn private_state_db(&self) -> Arc { + self.state_storage.private_state_db() + } + // TODO [ToDr] Don't use `ChainNotify` here! // Better to create a separate notification type for this. /// Adds an actor to be notified on certain events @@ -273,22 +302,42 @@ impl Provider { // best would be to change the API and only allow H256 instead of BlockID // in private-tx to avoid such mistakes. let contract_nonce = self.get_contract_nonce(&contract, BlockId::Latest)?; - let private_state = self.execute_private_transaction(BlockId::Latest, &signed_transaction)?; - trace!(target: "privatetx", "Private transaction created, encrypted transaction: {:?}, private state: {:?}", private, private_state); - let contract_validators = self.get_validators(BlockId::Latest, &contract)?; - trace!(target: "privatetx", "Required validators: {:?}", contract_validators); - let private_state_hash = self.calculate_state_hash(&private_state, contract_nonce); - trace!(target: "privatetx", "Hashed effective private state for sender: {:?}", private_state_hash); - self.transactions_for_signing.write().add_transaction(private.hash(), signed_transaction, &contract_validators, private_state, contract_nonce)?; - self.broadcast_private_transaction(private.hash(), private.rlp_bytes()); - if let Some(ref logging) = self.logging { - logging.private_tx_created(&tx_hash, &contract_validators); + let private_state = self.execute_private_transaction(BlockId::Latest, &signed_transaction); + match private_state { + Err(err) => { + match err { + Error::PrivateStateNotFound => { + trace!(target: "privatetx", "Private state for the contract not found, requesting from peers"); + if let Some(ref logging) = self.logging { + let contract_validators = self.get_validators(BlockId::Latest, &contract)?; + logging.private_tx_created(&tx_hash, &contract_validators); + logging.private_state_request(&tx_hash); + } + let request = RequestType::Creation(signed_transaction); + self.request_private_state(&contract, request)?; + }, + _ => {}, + } + Err(err) + } + Ok(private_state) => { + trace!(target: "privatetx", "Private transaction created, encrypted transaction: {:?}, private state: {:?}", private, private_state); + let contract_validators = self.get_validators(BlockId::Latest, &contract)?; + trace!(target: "privatetx", "Required validators: {:?}", contract_validators); + let private_state_hash = self.calculate_state_hash(&private_state, contract_nonce); + trace!(target: "privatetx", "Hashed effective private state for sender: {:?}", private_state_hash); + self.transactions_for_signing.write().add_transaction(private.hash(), signed_transaction, &contract_validators, private_state, contract_nonce)?; + self.broadcast_private_transaction(private.hash(), private.rlp_bytes()); + if let Some(ref logging) = self.logging { + logging.private_tx_created(&tx_hash, &contract_validators); + } + Ok(Receipt { + hash: tx_hash, + contract_address: contract, + status_code: 0, + }) + } } - Ok(Receipt { - hash: tx_hash, - contract_address: contract, - status_code: 0, - }) } /// Calculate hash from united private state and contract nonce @@ -312,55 +361,52 @@ impl Provider { ) } - /// Retrieve and verify the first available private transaction for every sender - fn process_verification_queue(&self) -> Result<(), Error> { - let process_transaction = |transaction: &VerifiedPrivateTransaction| -> Result<_, String> { - let private_hash = transaction.private_transaction.hash(); - match transaction.validator_account { - None => { + fn process_verification_transaction(&self, transaction: &VerifiedPrivateTransaction) -> Result<(), Error> { + let private_hash = transaction.private_transaction.hash(); + match transaction.validator_account { + None => { + trace!(target: "privatetx", "Propagating transaction further"); + self.broadcast_private_transaction(private_hash, transaction.private_transaction.rlp_bytes()); + return Ok(()); + } + Some(validator_account) => { + if !self.validator_accounts.contains(&validator_account) { trace!(target: "privatetx", "Propagating transaction further"); self.broadcast_private_transaction(private_hash, transaction.private_transaction.rlp_bytes()); return Ok(()); } - Some(validator_account) => { - if !self.validator_accounts.contains(&validator_account) { - trace!(target: "privatetx", "Propagating transaction further"); - self.broadcast_private_transaction(private_hash, transaction.private_transaction.rlp_bytes()); - return Ok(()); - } - let contract = Self::contract_address_from_transaction(&transaction.transaction) - .map_err(|_| "Incorrect type of action for the transaction")?; - // TODO #9825 [ToDr] Usage of BlockId::Latest - let contract_nonce = self.get_contract_nonce(&contract, BlockId::Latest); - if let Err(e) = contract_nonce { - return Err(format!("Cannot retrieve contract nonce: {:?}", e).into()); - } - let contract_nonce = contract_nonce.expect("Error was checked before"); - let private_state = self.execute_private_transaction(BlockId::Latest, &transaction.transaction); - if let Err(e) = private_state { - return Err(format!("Cannot retrieve private state: {:?}", e).into()); - } - let private_state = private_state.expect("Error was checked before"); - let private_state_hash = self.calculate_state_hash(&private_state, contract_nonce); - trace!(target: "privatetx", "Hashed effective private state for validator: {:?}", private_state_hash); - let signed_state = self.accounts.sign(validator_account, private_state_hash); - if let Err(e) = signed_state { - return Err(format!("Cannot sign the state: {:?}", e).into()); - } - let signed_state = signed_state.expect("Error was checked before"); - let signed_private_transaction = SignedPrivateTransaction::new(private_hash, signed_state, None); - trace!(target: "privatetx", "Sending signature for private transaction: {:?}", signed_private_transaction); - self.broadcast_signed_private_transaction(signed_private_transaction.hash(), signed_private_transaction.rlp_bytes()); - } + let contract = Self::contract_address_from_transaction(&transaction.transaction)?; + // TODO #9825 [ToDr] Usage of BlockId::Latest + let contract_nonce = self.get_contract_nonce(&contract, BlockId::Latest)?; + let private_state = self.execute_private_transaction(BlockId::Latest, &transaction.transaction)?; + let private_state_hash = self.calculate_state_hash(&private_state, contract_nonce); + trace!(target: "privatetx", "Hashed effective private state for validator: {:?}", private_state_hash); + let signed_state = self.accounts.sign(validator_account, private_state_hash)?; + let signed_private_transaction = SignedPrivateTransaction::new(private_hash, signed_state, None); + trace!(target: "privatetx", "Sending signature for private transaction: {:?}", signed_private_transaction); + self.broadcast_signed_private_transaction(signed_private_transaction.hash(), signed_private_transaction.rlp_bytes()); } - Ok(()) - }; + } + Ok(()) + } + + /// Retrieve and verify the first available private transaction for every sender + fn process_verification_queue(&self) -> Result<(), Error> { let nonce_cache = NonceCache::new(NONCE_CACHE_SIZE); let local_accounts = HashSet::new(); let ready_transactions = self.transactions_for_verification.drain(self.pool_client(&nonce_cache, &local_accounts)); for transaction in ready_transactions { - if let Err(e) = process_transaction(&transaction) { - warn!(target: "privatetx", "Error: {:?}", e); + if let Err(err) = self.process_verification_transaction(&transaction) { + warn!(target: "privatetx", "Error: {:?}", err); + match err { + Error::PrivateStateNotFound => { + let contract = transaction.private_transaction.contract(); + trace!(target: "privatetx", "Private state for the contract {:?} not found, requesting from peers", &contract); + let request = RequestType::Verification(transaction); + self.request_private_state(&contract, request)?; + } + _ => {} + } } } Ok(()) @@ -383,6 +429,7 @@ impl Provider { let original_tx_hash = desc.original_transaction.hash(); if last.0 { + let contract = Self::contract_address_from_transaction(&desc.original_transaction)?; let mut signatures = desc.received_signatures.clone(); signatures.push(signed_tx.signature()); let rsv: Vec = signatures.into_iter().map(|sign| sign.into_electrum().into()).collect(); @@ -411,7 +458,6 @@ impl Provider { } } // Notify about state changes - let contract = Self::contract_address_from_transaction(&desc.original_transaction)?; // TODO #9825 Usage of BlockId::Latest if self.get_contract_version(BlockId::Latest, &contract) >= PRIVATE_CONTRACT_WITH_NOTIFICATION_VER { match self.state_changes_notify(BlockId::Latest, &contract, &desc.original_transaction.sender(), desc.original_transaction.hash()) { @@ -489,6 +535,75 @@ impl Provider { self.notify(|notify| notify.broadcast(ChainMessageType::SignedPrivateTransaction(transaction_hash, message.clone()))); } + fn request_private_state(&self, address: &Address, request_type: RequestType) -> Result<(), Error> { + // Define the list of available contracts + let mut private_contracts = Vec::new(); + private_contracts.push(*address); + if let Some(key_server_account) = self.keys_provider.key_server_account() { + if let Some(available_contracts) = self.keys_provider.available_keys(BlockId::Latest, &key_server_account) { + for private_contract in available_contracts { + if private_contract == *address { + continue; + } + private_contracts.push(private_contract); + } + } + } + // Check states for the avaialble contracts, if they're outdated + let mut stalled_contracts_hashes: HashSet = HashSet::new(); + for address in private_contracts { + if let Ok(state_hash) = self.get_decrypted_state_from_contract(&address, BlockId::Latest) { + if state_hash.len() != H256::len_bytes() { + return Err(Error::StateIncorrect); + } + let state_hash = H256::from_slice(&state_hash); + if let Err(_) = self.state_storage.private_state_db().state(&state_hash) { + // State not found in the local db + stalled_contracts_hashes.insert(state_hash); + } + } + } + let hashes_to_sync = self.state_storage.add_request(request_type, stalled_contracts_hashes); + if !hashes_to_sync.is_empty() { + trace!(target: "privatetx", "Requesting states for the following hashes: {:?}", hashes_to_sync); + for hash in hashes_to_sync { + self.notify(|notify| notify.broadcast(ChainMessageType::PrivateStateRequest(hash))); + } + } + Ok(()) + } + + fn private_state_sync_completed(&self, hash: &H256) -> Result<(), Error> { + self.state_storage.state_sync_completed(hash); + if self.state_storage.requests_ready() { + trace!(target: "privatetx", "Private state sync completed, processing pending requests"); + let ready_requests = self.state_storage.drain_ready_requests(); + for request in ready_requests { + match request { + RequestType::Creation(transaction) => { + match self.create_private_transaction(transaction) { + Ok(receipt) => trace!(target: "privatetx", "Creation request processed, receipt: {:?}", receipt), + Err(e) => error!(target: "privatetx", "Cannot process creation request with error: {:?}", e), + } + } + RequestType::Verification(transaction) => { + if let Err(err) = self.process_verification_transaction(&transaction) { + warn!(target: "privatetx", "Error while processing pending verification request: {:?}", err); + match err { + Error::PrivateStateNotFound => { + let contract = transaction.private_transaction.contract(); + error!(target: "privatetx", "Cannot retrieve private state after sync for {:?}", &contract); + } + _ => {} + } + } + } + } + } + } + Ok(()) + } + fn iv_from_transaction(transaction: &SignedTransaction) -> H128 { let nonce = keccak(&transaction.nonce.rlp_bytes()); let (iv, _) = nonce.as_bytes().split_at(INIT_VEC_LEN); @@ -512,10 +627,28 @@ impl Provider { } fn get_decrypted_state(&self, address: &Address, block: BlockId) -> Result { + match self.use_offchain_storage { + true => { + let hashed_state = self.get_decrypted_state_from_contract(address, block)?; + if hashed_state.len() != H256::len_bytes() { + return Err(Error::StateIncorrect); + } + let hashed_state = H256::from_slice(&hashed_state); + let stored_state_data = self.state_storage.private_state_db().state(&hashed_state)?; + self.decrypt(address, &stored_state_data) + } + false => self.get_decrypted_state_from_contract(address, block), + } + } + + fn get_decrypted_state_from_contract(&self, address: &Address, block: BlockId) -> Result { let (data, decoder) = private_contract::functions::state::call(); let value = self.client.call_contract(block, *address, data)?; let state = decoder.decode(&value).map_err(|e| Error::Call(format!("Contract call failed {:?}", e)))?; - self.decrypt(address, &state) + match self.use_offchain_storage { + true => Ok(state), + false => self.decrypt(address, &state), + } } fn get_decrypted_code(&self, address: &Address, block: BlockId) -> Result { @@ -610,9 +743,14 @@ impl Provider { }; (enc_code, self.encrypt(&contract_address, &Self::iv_from_transaction(transaction), &Self::snapshot_from_storage(&storage))?) }; + let mut saved_state = encrypted_storage; + if self.use_offchain_storage { + // Save state into the storage and return its hash + saved_state = self.state_storage.private_state_db().save_state(&saved_state)?.0.to_vec(); + } Ok(PrivateExecutionResult { code: encrypted_code, - state: encrypted_storage, + state: saved_state, contract_address: contract_address, result, }) @@ -739,6 +877,21 @@ impl Provider { } } +impl IoHandler for Provider { + fn initialize(&self, io: &IoContext) { + if self.use_offchain_storage { + io.register_timer(STATE_RETRIEVAL_TIMER, STATE_RETRIEVAL_TICK).expect("Error registering state retrieval timer"); + } + } + + fn timeout(&self, _io: &IoContext, timer: TimerToken) { + match timer { + STATE_RETRIEVAL_TIMER => self.state_storage.tick(&self.logging), + _ => warn!("IO service triggered unregistered timer '{}'", timer), + } + } +} + pub trait Importer { /// Process received private transaction fn import_private_transaction(&self, _rlp: &[u8]) -> Result; @@ -747,6 +900,9 @@ pub trait Importer { /// /// Creates corresponding public transaction if last required signature collected and sends it to the chain fn import_signed_private_transaction(&self, _rlp: &[u8]) -> Result; + + /// Function called when requested private state retrieved from peer and saved to DB. + fn private_state_synced(&self, hash: &H256) -> Result<(), String>; } // TODO [ToDr] Offload more heavy stuff to the IoService thread. @@ -810,6 +966,23 @@ impl Importer for Arc { } Ok(private_hash) } + + fn private_state_synced(&self, hash: &H256) -> Result<(), String> { + trace!(target: "privatetx", "Private state synced, hash: {:?}", hash); + let provider = Arc::downgrade(self); + let completed_hash = *hash; + let result = self.channel.send(ClientIoMessage::execute(move |_| { + if let Some(provider) = provider.upgrade() { + if let Err(e) = provider.private_state_sync_completed(&completed_hash) { + warn!(target: "privatetx", "Unable to process the state synced signal: {}", e); + } + } + })); + if let Err(e) = result { + warn!(target: "privatetx", "Error sending private state synced message: {:?}", e); + } + Ok(()) + } } impl ChainNotify for Provider { diff --git a/ethcore/private-tx/src/log.rs b/ethcore/private-tx/src/log.rs index 2d2be0181..0d0979752 100644 --- a/ethcore/private-tx/src/log.rs +++ b/ethcore/private-tx/src/log.rs @@ -70,6 +70,10 @@ impl Default for MonoTime { pub enum PrivateTxStatus { /// Private tx was created but no validation received yet Created, + /// Private state not found locally and being retrived from peers + PrivateStateSync, + /// Retrieval of the private state failed, transaction not created + PrivateStateSyncFailed, /// Several validators (but not all) validated the transaction Validating, /// All validators has validated the private tx @@ -209,6 +213,17 @@ impl Logging { /// Logs the creation of the private transaction pub fn private_tx_created(&self, tx_hash: &H256, validators: &[Address]) { + let mut logs = self.logs.write(); + if let Some(transaction_log) = logs.get_mut(&tx_hash) { + if transaction_log.status == PrivateTxStatus::PrivateStateSync { + // Transaction was already created before, its private state was being retrieved + transaction_log.status = PrivateTxStatus::Created; + return; + } else { + error!(target: "privatetx", "Attempt to create a duplicate transaction"); + return; + } + } let mut validator_logs = Vec::new(); for account in validators { validator_logs.push(ValidatorLog { @@ -216,7 +231,6 @@ impl Logging { validation_timestamp: None, }); } - let mut logs = self.logs.write(); if logs.len() > MAX_JOURNAL_LEN { // Remove the oldest log if let Some(tx_hash) = logs.values() @@ -236,6 +250,22 @@ impl Logging { }); } + /// Private state retrieval started + pub fn private_state_request(&self, tx_hash: &H256) { + let mut logs = self.logs.write(); + if let Some(transaction_log) = logs.get_mut(&tx_hash) { + transaction_log.status = PrivateTxStatus::PrivateStateSync; + } + } + + /// Private state retrieval failed + pub fn private_state_sync_failed(&self, tx_hash: &H256) { + let mut logs = self.logs.write(); + if let Some(transaction_log) = logs.get_mut(&tx_hash) { + transaction_log.status = PrivateTxStatus::PrivateStateSyncFailed; + } + } + /// Logs the validation of the private transaction by one of its validators pub fn signature_added(&self, tx_hash: &H256, validator: &Address) { let mut logs = self.logs.write(); diff --git a/ethcore/private-tx/src/private_state_db.rs b/ethcore/private-tx/src/private_state_db.rs new file mode 100644 index 000000000..a701f7330 --- /dev/null +++ b/ethcore/private-tx/src/private_state_db.rs @@ -0,0 +1,62 @@ +// Copyright 2015-2019 Parity Technologies (UK) Ltd. +// This file is part of Parity Ethereum. + +// Parity Ethereum is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Ethereum is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Ethereum. If not, see . + +use std::sync::Arc; +use ethereum_types::H256; +use bytes::Bytes; +use kvdb::{KeyValueDB, DBTransaction}; +use keccak_hasher::KeccakHasher; +use hash_db::Hasher; +use ethcore_db::COL_PRIVATE_TRANSACTIONS_STATE; +use error::Error; + +/// Wrapper around local db with private state for sync purposes +pub struct PrivateStateDB { + db: Arc, +} + +impl PrivateStateDB { + /// Constructs the object + pub fn new(db: Arc) -> Self { + PrivateStateDB { + db, + } + } + + /// Returns saved state for the hash + pub fn state(&self, state_hash: &H256) -> Result { + trace!(target: "privatetx", "Retrieve private state from db with hash: {:?}", state_hash); + self.db.get(COL_PRIVATE_TRANSACTIONS_STATE, state_hash.as_bytes()) + .expect("Low-level database error. Some issue with your hard disk?") + .map(|s| s.to_vec()) + .ok_or(Error::PrivateStateNotFound) + } + + /// Stores state for the hash + pub fn save_state(&self, storage: &Bytes) -> Result { + let state_hash = self.state_hash(storage)?; + let mut transaction = DBTransaction::new(); + transaction.put(COL_PRIVATE_TRANSACTIONS_STATE, state_hash.as_bytes(), storage); + self.db.write(transaction).map_err(|_| Error::DatabaseWriteError)?; + trace!(target: "privatetx", "Private state saved to db, its hash: {:?}", state_hash); + Ok(state_hash) + } + + /// Returns state's hash without committing it to DB + pub fn state_hash(&self, state: &Bytes) -> Result { + Ok(KeccakHasher::hash(state)) + } +} \ No newline at end of file diff --git a/ethcore/private-tx/src/state_store.rs b/ethcore/private-tx/src/state_store.rs new file mode 100644 index 000000000..002dca365 --- /dev/null +++ b/ethcore/private-tx/src/state_store.rs @@ -0,0 +1,165 @@ +// Copyright 2015-2019 Parity Technologies (UK) Ltd. +// This file is part of Parity Ethereum. + +// Parity Ethereum is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Ethereum is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Ethereum. If not, see . + +use std::collections::{HashSet, HashMap}; +use std::sync::Arc; +use std::time::{Instant, Duration}; +use parking_lot::RwLock; +use ethereum_types::H256; +use kvdb::KeyValueDB; +use types::transaction::SignedTransaction; +use private_transactions::VerifiedPrivateTransaction; +use private_state_db::PrivateStateDB; +use log::Logging; + +/// Max duration of retrieving state (in ms) +const MAX_REQUEST_SESSION_DURATION: u64 = 120 * 1000; + +/// Type of the stored reques +pub enum RequestType { + /// Verification of private transaction + Verification(Arc), + /// Creation of the private transaction + Creation(SignedTransaction), +} + +#[derive(Clone, PartialEq)] +enum RequestState { + Syncing, + Ready, +} + +struct StateRequest { + request_type: RequestType, + request_hashes: HashSet, + state: RequestState, +} + +/// Wrapper over storage for the private states +pub struct PrivateStateStorage { + private_state_db: Arc, + requests: RwLock>, + syncing_hashes: RwLock>, +} + +impl PrivateStateStorage { + /// Constructs the object + pub fn new(db: Arc) -> Self { + PrivateStateStorage { + private_state_db: Arc::new(PrivateStateDB::new(db)), + requests: RwLock::new(Vec::new()), + syncing_hashes: RwLock::default(), + } + } + + /// Checks if ready for processing requests exist in queue + pub fn requests_ready(&self) -> bool { + let requests = self.requests.read(); + requests.iter().find(|r| r.state == RequestState::Ready).is_some() + } + + /// Signals that corresponding private state retrieved and added into the local db + pub fn state_sync_completed(&self, synced_state_hash: &H256) { + let mut syncing_hashes = self.syncing_hashes.write(); + syncing_hashes.remove(synced_state_hash); + self.mark_hash_ready(synced_state_hash); + } + + /// Returns underlying DB + pub fn private_state_db(&self) -> Arc { + self.private_state_db.clone() + } + + /// Store a request for state's sync and later processing, returns new hashes, which sync is required + pub fn add_request(&self, request_type: RequestType, request_hashes: HashSet) -> Vec { + let request = StateRequest { + request_type: request_type, + request_hashes: request_hashes.clone(), + state: RequestState::Syncing, + }; + let mut requests = self.requests.write(); + requests.push(request); + let mut new_hashes = Vec::new(); + for hash in request_hashes { + let mut hashes = self.syncing_hashes.write(); + if hashes.insert(hash, Instant::now() + Duration::from_millis(MAX_REQUEST_SESSION_DURATION)).is_none() { + new_hashes.push(hash); + } + } + new_hashes + } + + /// Drains ready requests to process + pub fn drain_ready_requests(&self) -> Vec { + let mut requests_queue = self.requests.write(); + let mut drained = Vec::new(); + let mut i = 0; + while i != requests_queue.len() { + if requests_queue[i].state == RequestState::Ready { + let request = requests_queue.remove(i); + drained.push(request.request_type); + } else { + i += 1; + } + } + drained + } + + /// State retrieval timer's tick + pub fn tick(&self, logging: &Option) { + let mut syncing_hashes = self.syncing_hashes.write(); + let current_time = Instant::now(); + syncing_hashes + .iter() + .filter(|&(_, expiration_time)| *expiration_time >= current_time) + .for_each(|(hash, _)| self.mark_hash_stale(&hash, logging)); + syncing_hashes.retain(|_, expiration_time| *expiration_time < current_time); + } + + fn mark_hash_ready(&self, ready_hash: &H256) { + let mut requests = self.requests.write(); + for request in requests.iter_mut() { + request.request_hashes.remove(ready_hash); + if request.request_hashes.is_empty() && request.state == RequestState::Syncing { + request.state = RequestState::Ready; + } + } + } + + fn mark_hash_stale(&self, stale_hash: &H256, logging: &Option) { + let mut requests = self.requests.write(); + requests.retain(|request| { + let mut delete_request = false; + if request.request_hashes.contains(stale_hash) { + let tx_hash; + match &request.request_type { + RequestType::Verification(transaction) => { + tx_hash = transaction.transaction_hash; + } + RequestType::Creation(transaction) => { + tx_hash = transaction.hash(); + if let Some(ref logging) = logging { + logging.private_state_sync_failed(&tx_hash); + } + } + } + trace!(target: "privatetx", "Private state request for {:?} staled due to timeout", &tx_hash); + delete_request = true; + } + !delete_request + }); + } +} \ No newline at end of file diff --git a/ethcore/private-tx/tests/private_contract.rs b/ethcore/private-tx/tests/private_contract.rs index e4ee445df..77a32c049 100644 --- a/ethcore/private-tx/tests/private_contract.rs +++ b/ethcore/private-tx/tests/private_contract.rs @@ -37,7 +37,7 @@ use types::ids::BlockId; use types::transaction::{Transaction, Action}; use ethcore::{ CreateContractAddress, - test_helpers::{generate_dummy_client, push_block_with_transactions}, + test_helpers::{generate_dummy_client, push_block_with_transactions, new_db}, miner::Miner, spec, }; @@ -65,11 +65,13 @@ fn private_contract() { validator_accounts: vec![key3.address(), key4.address()], signer_account: None, logs_path: None, + use_offchain_storage: false, }; let io = ethcore_io::IoChannel::disconnected(); let miner = Arc::new(Miner::new_for_tests(&spec::new_test(), None)); let private_keys = Arc::new(StoringKeyProvider::default()); + let db = new_db(); let pm = Arc::new(Provider::new( client.clone(), miner, @@ -78,6 +80,7 @@ fn private_contract() { config, io, private_keys, + db.key_value().clone(), )); let (address, _) = contract_address(CreateContractAddress::FromSenderAndNonce, &key1.address(), &0.into(), &[]); @@ -200,11 +203,13 @@ fn call_other_private_contract() { validator_accounts: vec![key3.address(), key4.address()], signer_account: None, logs_path: None, + use_offchain_storage: false, }; let io = ethcore_io::IoChannel::disconnected(); let miner = Arc::new(Miner::new_for_tests(&spec::new_test(), None)); let private_keys = Arc::new(StoringKeyProvider::default()); + let db = new_db(); let pm = Arc::new(Provider::new( client.clone(), miner, @@ -213,6 +218,7 @@ fn call_other_private_contract() { config, io, private_keys.clone(), + db.key_value().clone(), )); // Deploy contract A diff --git a/ethcore/service/src/service.rs b/ethcore/service/src/service.rs index 737d77254..9d3c8703b 100644 --- a/ethcore/service/src/service.rs +++ b/ethcore/service/src/service.rs @@ -73,6 +73,16 @@ impl PrivateTxHandler for PrivateTxService { } } } + + fn private_state_synced(&self, hash: &H256) -> Result<(), String> { + match self.provider.private_state_synced(hash) { + Ok(handle_result) => Ok(handle_result), + Err(err) => { + warn!(target: "privatetx", "Unable to handle private state synced message: {}", err); + return Err(err.to_string()) + } + } + } } /// Client service setup. Creates and registers client and network services with the IO subsystem. @@ -139,8 +149,10 @@ impl ClientService { private_tx_conf, io_service.channel(), private_keys, + blockchain_db.key_value().clone(), )); - let private_tx = Arc::new(PrivateTxService::new(provider)); + let private_tx = Arc::new(PrivateTxService::new(provider.clone())); + io_service.register_handler(provider)?; let client_io = Arc::new(ClientIoHandler { client: client.clone(), diff --git a/ethcore/src/client/chain_notify.rs b/ethcore/src/client/chain_notify.rs index 5f9b8ed31..1567f6c5c 100644 --- a/ethcore/src/client/chain_notify.rs +++ b/ethcore/src/client/chain_notify.rs @@ -29,6 +29,8 @@ pub enum ChainMessageType { PrivateTransaction(H256, Vec), /// Message with signed private transaction SignedPrivateTransaction(H256, Vec), + /// Private state request for the particular private contract + PrivateStateRequest(H256), } /// Route type to indicate whether it is enacted or retracted. diff --git a/ethcore/src/test_helpers.rs b/ethcore/src/test_helpers.rs index 32a5f7778..4ab63846a 100644 --- a/ethcore/src/test_helpers.rs +++ b/ethcore/src/test_helpers.rs @@ -514,6 +514,7 @@ impl ChainNotify for TestNotify { ChainMessageType::Consensus(data) => data, ChainMessageType::SignedPrivateTransaction(_, data) => data, ChainMessageType::PrivateTransaction(_, data) => data, + ChainMessageType::PrivateStateRequest(_) => Vec::new(), }; self.messages.write().push(data); } diff --git a/ethcore/sync/Cargo.toml b/ethcore/sync/Cargo.toml index df35a4bab..e0f1a3fe5 100644 --- a/ethcore/sync/Cargo.toml +++ b/ethcore/sync/Cargo.toml @@ -18,6 +18,7 @@ ethcore-light = { path = "../light" } ethcore-network = { path = "../../util/network" } ethcore-network-devp2p = { path = "../../util/network-devp2p" } ethereum-types = "0.6.0" +ethcore-private-tx = { path = "../private-tx" } ethkey = { path = "../../accounts/ethkey" } ethstore = { path = "../../accounts/ethstore" } fastmap = { path = "../../util/fastmap" } @@ -42,7 +43,6 @@ parity-runtime = { path = "../../util/runtime" } env_logger = "0.5" ethcore = { path = "..", features = ["test-helpers"] } ethcore-io = { path = "../../util/io", features = ["mio"] } -ethcore-private-tx = { path = "../private-tx" } kvdb-memorydb = "0.1" rustc-hex = "1.0" rand_xorshift = "0.1.1" diff --git a/ethcore/sync/src/api.rs b/ethcore/sync/src/api.rs index 3a662f8cb..eeb6cb41d 100644 --- a/ethcore/sync/src/api.rs +++ b/ethcore/sync/src/api.rs @@ -35,6 +35,7 @@ use ethkey::Secret; use ethcore::client::{ChainNotify, NewBlocks, ChainMessageType}; use client_traits::BlockChainClient; use ethcore::snapshot::SnapshotService; +use ethcore_private_tx::PrivateStateDB; use types::BlockNumber; use sync_io::NetSyncIo; use chain::{ChainSyncApi, SyncStatus as EthSyncStatus}; @@ -42,7 +43,7 @@ use std::net::{SocketAddr, AddrParseError}; use std::str::FromStr; use parking_lot::{RwLock, Mutex}; use chain::{ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_62, - PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3, SyncState}; + PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3, PAR_PROTOCOL_VERSION_4, SyncState}; use chain::sync_packet::SyncPacket::{PrivateTransactionPacket, SignedPrivateTransactionPacket}; use light::client::AsLightClient; use light::Provider; @@ -263,6 +264,8 @@ pub struct Params { pub snapshot_service: Arc, /// Private tx service. pub private_tx_handler: Option>, + /// Private state wrapper + pub private_state: Option>, /// Light data provider. pub provider: Arc, /// Network layer configuration. @@ -377,6 +380,7 @@ impl EthSync { chain: params.chain, snapshot_service: params.snapshot_service, overlay: RwLock::new(HashMap::new()), + private_state: params.private_state, }), light_proto: light_proto, subprotocol_name: params.config.subprotocol_name, @@ -460,6 +464,8 @@ struct SyncProtocolHandler { sync: ChainSyncApi, /// Chain overlay used to cache data such as fork block. overlay: RwLock>, + /// Private state db + private_state: Option>, } impl NetworkProtocolHandler for SyncProtocolHandler { @@ -475,7 +481,12 @@ impl NetworkProtocolHandler for SyncProtocolHandler { } fn read(&self, io: &dyn NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { - self.sync.dispatch_packet(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer, packet_id, data); + self.sync.dispatch_packet(&mut NetSyncIo::new(io, + &*self.chain, + &*self.snapshot_service, + &self.overlay, + self.private_state.clone()), + *peer, packet_id, data); } fn connected(&self, io: &dyn NetworkContext, peer: &PeerId) { @@ -484,20 +495,30 @@ impl NetworkProtocolHandler for SyncProtocolHandler { let warp_protocol = io.protocol_version(WARP_SYNC_PROTOCOL_ID, *peer).unwrap_or(0) != 0; let warp_context = io.subprotocol_name() == WARP_SYNC_PROTOCOL_ID; if warp_protocol == warp_context { - self.sync.write().on_peer_connected(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer); + self.sync.write().on_peer_connected(&mut NetSyncIo::new(io, + &*self.chain, + &*self.snapshot_service, + &self.overlay, + self.private_state.clone()), + *peer); } } fn disconnected(&self, io: &dyn NetworkContext, peer: &PeerId) { trace_time!("sync::disconnected"); if io.subprotocol_name() != WARP_SYNC_PROTOCOL_ID { - self.sync.write().on_peer_aborting(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer); + self.sync.write().on_peer_aborting(&mut NetSyncIo::new(io, + &*self.chain, + &*self.snapshot_service, + &self.overlay, + self.private_state.clone()), + *peer); } } fn timeout(&self, io: &dyn NetworkContext, timer: TimerToken) { trace_time!("sync::timeout"); - let mut io = NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay); + let mut io = NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay, self.private_state.clone()); match timer { PEERS_TIMER => self.sync.write().maintain_peers(&mut io), MAINTAIN_SYNC_TIMER => self.sync.write().maintain_sync(&mut io), @@ -528,8 +549,11 @@ impl ChainNotify for EthSync { use light::net::Announcement; self.network.with_context(self.subprotocol_name, |context| { - let mut sync_io = NetSyncIo::new(context, &*self.eth_handler.chain, &*self.eth_handler.snapshot_service, - &self.eth_handler.overlay); + let mut sync_io = NetSyncIo::new(context, + &*self.eth_handler.chain, + &*self.eth_handler.snapshot_service, + &self.eth_handler.overlay, + self.eth_handler.private_state.clone()); self.eth_handler.sync.write().chain_new_blocks( &mut sync_io, &new_blocks.imported, @@ -576,7 +600,7 @@ impl ChainNotify for EthSync { self.network.register_protocol(self.eth_handler.clone(), self.subprotocol_name, &[ETH_PROTOCOL_VERSION_62, ETH_PROTOCOL_VERSION_63]) .unwrap_or_else(|e| warn!("Error registering ethereum protocol: {:?}", e)); // register the warp sync subprotocol - self.network.register_protocol(self.eth_handler.clone(), WARP_SYNC_PROTOCOL_ID, &[PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3]) + self.network.register_protocol(self.eth_handler.clone(), WARP_SYNC_PROTOCOL_ID, &[PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3, PAR_PROTOCOL_VERSION_4]) .unwrap_or_else(|e| warn!("Error registering snapshot sync protocol: {:?}", e)); // register the light protocol. @@ -593,13 +617,19 @@ impl ChainNotify for EthSync { fn broadcast(&self, message_type: ChainMessageType) { self.network.with_context(WARP_SYNC_PROTOCOL_ID, |context| { - let mut sync_io = NetSyncIo::new(context, &*self.eth_handler.chain, &*self.eth_handler.snapshot_service, &self.eth_handler.overlay); + let mut sync_io = NetSyncIo::new(context, + &*self.eth_handler.chain, + &*self.eth_handler.snapshot_service, + &self.eth_handler.overlay, + self.eth_handler.private_state.clone()); match message_type { ChainMessageType::Consensus(message) => self.eth_handler.sync.write().propagate_consensus_packet(&mut sync_io, message), ChainMessageType::PrivateTransaction(transaction_hash, message) => self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, PrivateTransactionPacket, message), ChainMessageType::SignedPrivateTransaction(transaction_hash, message) => self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, SignedPrivateTransactionPacket, message), + ChainMessageType::PrivateStateRequest(hash) => + self.eth_handler.sync.write().request_private_state(&mut sync_io, &hash), } }); } @@ -664,7 +694,11 @@ impl ManageNetwork for EthSync { fn stop_network(&self) { self.network.with_context(self.subprotocol_name, |context| { - let mut sync_io = NetSyncIo::new(context, &*self.eth_handler.chain, &*self.eth_handler.snapshot_service, &self.eth_handler.overlay); + let mut sync_io = NetSyncIo::new(context, + &*self.eth_handler.chain, + &*self.eth_handler.snapshot_service, + &self.eth_handler.overlay, + self.eth_handler.private_state.clone()); self.eth_handler.sync.write().abort(&mut sync_io); }); diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index b17334b48..a2b5650ed 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -691,7 +691,7 @@ mod tests { let mut chain = TestBlockChainClient::new(); let snapshot_service = TestSnapshotService::new(); let queue = RwLock::new(VecDeque::new()); - let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None); + let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None, None); // Valid headers sequence. let valid_headers = [ @@ -757,7 +757,7 @@ mod tests { let mut chain = TestBlockChainClient::new(); let snapshot_service = TestSnapshotService::new(); let queue = RwLock::new(VecDeque::new()); - let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None); + let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None, None); let mut headers = Vec::with_capacity(3); let parent_hash = H256::random(); @@ -807,7 +807,7 @@ mod tests { let mut chain = TestBlockChainClient::new(); let snapshot_service = TestSnapshotService::new(); let queue = RwLock::new(VecDeque::new()); - let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None); + let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None, None); // Import block headers. let mut headers = Vec::with_capacity(4); @@ -875,7 +875,7 @@ mod tests { let mut chain = TestBlockChainClient::new(); let snapshot_service = TestSnapshotService::new(); let queue = RwLock::new(VecDeque::new()); - let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None); + let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None, None); // Import block headers. let mut headers = Vec::with_capacity(4); @@ -940,7 +940,7 @@ mod tests { let mut chain = TestBlockChainClient::new(); let snapshot_service = TestSnapshotService::new(); let queue = RwLock::new(VecDeque::new()); - let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None); + let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None, None); let heads = [ spec.genesis_header(), @@ -980,7 +980,7 @@ mod tests { let mut chain = TestBlockChainClient::new(); let snapshot_service = TestSnapshotService::new(); let queue = RwLock::new(VecDeque::new()); - let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None); + let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None, None); let heads = [ spec.genesis_header() diff --git a/ethcore/sync/src/chain/handler.rs b/ethcore/sync/src/chain/handler.rs index 659720e3f..27da6931f 100644 --- a/ethcore/sync/src/chain/handler.rs +++ b/ethcore/sync/src/chain/handler.rs @@ -48,6 +48,7 @@ use super::sync_packet::SyncPacket::{ SnapshotDataPacket, PrivateTransactionPacket, SignedPrivateTransactionPacket, + PrivateStatePacket, }; use super::{ @@ -65,6 +66,7 @@ use super::{ MAX_NEW_HASHES, PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_3, + PAR_PROTOCOL_VERSION_4, }; /// The Chain Sync Handler: handles responses from peers @@ -86,6 +88,7 @@ impl SyncHandler { SnapshotDataPacket => SyncHandler::on_snapshot_data(sync, io, peer, &rlp), PrivateTransactionPacket => SyncHandler::on_private_transaction(sync, io, peer, &rlp), SignedPrivateTransactionPacket => SyncHandler::on_signed_private_transaction(sync, io, peer, &rlp), + PrivateStatePacket => SyncHandler::on_private_state_data(sync, io, peer, &rlp), _ => { debug!(target: "sync", "{}: Unknown packet {}", peer, packet_id.id()); Ok(()) @@ -585,6 +588,7 @@ impl SyncHandler { asking: PeerAsking::Nothing, asking_blocks: Vec::new(), asking_hash: None, + asking_private_state: None, ask_time: Instant::now(), last_sent_transactions: Default::default(), last_sent_private_transactions: Default::default(), @@ -635,7 +639,7 @@ impl SyncHandler { } if false - || (warp_protocol && (peer.protocol_version < PAR_PROTOCOL_VERSION_1.0 || peer.protocol_version > PAR_PROTOCOL_VERSION_3.0)) + || (warp_protocol && (peer.protocol_version < PAR_PROTOCOL_VERSION_1.0 || peer.protocol_version > PAR_PROTOCOL_VERSION_4.0)) || (!warp_protocol && (peer.protocol_version < ETH_PROTOCOL_VERSION_62.0 || peer.protocol_version > ETH_PROTOCOL_VERSION_63.0)) { trace!(target: "sync", "Peer {} unsupported eth protocol ({})", peer_id, peer.protocol_version); @@ -696,7 +700,7 @@ impl SyncHandler { return Ok(()); } }; - trace!(target: "sync", "Received signed private transaction packet from {:?}", peer_id); + trace!(target: "privatetx", "Received signed private transaction packet from {:?}", peer_id); match private_handler.import_signed_private_transaction(r.as_raw()) { Ok(transaction_hash) => { //don't send the packet back @@ -705,7 +709,7 @@ impl SyncHandler { } }, Err(e) => { - trace!(target: "sync", "Ignoring the message, error queueing: {}", e); + trace!(target: "privatetx", "Ignoring the message, error queueing: {}", e); } } Ok(()) @@ -724,7 +728,7 @@ impl SyncHandler { return Ok(()); } }; - trace!(target: "sync", "Received private transaction packet from {:?}", peer_id); + trace!(target: "privatetx", "Received private transaction packet from {:?}", peer_id); match private_handler.import_private_transaction(r.as_raw()) { Ok(transaction_hash) => { //don't send the packet back @@ -733,11 +737,63 @@ impl SyncHandler { } }, Err(e) => { - trace!(target: "sync", "Ignoring the message, error queueing: {}", e); + trace!(target: "privatetx", "Ignoring the message, error queueing: {}", e); } } Ok(()) } + + fn on_private_state_data(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> { + if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) { + trace!(target: "sync", "{} Ignoring packet from unconfirmed/unknown peer", peer_id); + return Ok(()); + } + if !sync.reset_peer_asking(peer_id, PeerAsking::PrivateState) { + trace!(target: "sync", "{}: Ignored unexpected private state data", peer_id); + return Ok(()); + } + let requested_hash = sync.peers.get(&peer_id).and_then(|p| p.asking_private_state); + let requested_hash = match requested_hash { + Some(hash) => hash, + None => { + debug!(target: "sync", "{}: Ignored unexpected private state (requested_hash is None)", peer_id); + return Ok(()); + } + }; + let private_handler = match sync.private_tx_handler { + Some(ref handler) => handler, + None => { + trace!(target: "sync", "{} Ignoring private tx packet from peer", peer_id); + return Ok(()); + } + }; + trace!(target: "privatetx", "Received private state data packet from {:?}", peer_id); + let private_state_data: Bytes = r.val_at(0)?; + match io.private_state() { + Some(db) => { + // Check hash of the rececived data before submitting it to DB + let received_hash = db.state_hash(&private_state_data).unwrap_or_default(); + if received_hash != requested_hash { + trace!(target: "sync", "{} Ignoring private state data with unexpected hash from peer", peer_id); + return Ok(()); + } + match db.save_state(&private_state_data) { + Ok(hash) => { + if let Err(err) = private_handler.private_state_synced(&hash) { + trace!(target: "privatetx", "Ignoring received private state message, error queueing: {}", err); + } + } + Err(e) => { + error!(target: "privatetx", "Cannot save received private state {:?}", e); + } + } + } + None => { + trace!(target: "sync", "{} Ignoring private tx packet from peer", peer_id); + } + }; + Ok(()) + } } #[cfg(test)] @@ -765,7 +821,7 @@ mod tests { let queue = RwLock::new(VecDeque::new()); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); let ss = TestSnapshotService::new(); - let mut io = TestIo::new(&mut client, &ss, &queue, None); + let mut io = TestIo::new(&mut client, &ss, &queue, None, None); let hashes_data = get_dummy_hashes(); let hashes_rlp = Rlp::new(&hashes_data); @@ -786,7 +842,7 @@ mod tests { let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); //sync.have_common_block = true; let ss = TestSnapshotService::new(); - let mut io = TestIo::new(&mut client, &ss, &queue, None); + let mut io = TestIo::new(&mut client, &ss, &queue, None, None); let block = Rlp::new(&block_data); @@ -805,7 +861,7 @@ mod tests { let queue = RwLock::new(VecDeque::new()); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); let ss = TestSnapshotService::new(); - let mut io = TestIo::new(&mut client, &ss, &queue, None); + let mut io = TestIo::new(&mut client, &ss, &queue, None, None); let block = Rlp::new(&block_data); @@ -819,7 +875,7 @@ mod tests { let queue = RwLock::new(VecDeque::new()); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); let ss = TestSnapshotService::new(); - let mut io = TestIo::new(&mut client, &ss, &queue, None); + let mut io = TestIo::new(&mut client, &ss, &queue, None, None); let empty_data = vec![]; let block = Rlp::new(&empty_data); @@ -836,7 +892,7 @@ mod tests { let queue = RwLock::new(VecDeque::new()); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); let ss = TestSnapshotService::new(); - let mut io = TestIo::new(&mut client, &ss, &queue, None); + let mut io = TestIo::new(&mut client, &ss, &queue, None, None); let empty_hashes_data = vec![]; let hashes_rlp = Rlp::new(&empty_hashes_data); diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index 1b40649ec..d71cd9f84 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -152,6 +152,8 @@ pub const PAR_PROTOCOL_VERSION_1: (u8, u8) = (1, 0x15); pub const PAR_PROTOCOL_VERSION_2: (u8, u8) = (2, 0x16); /// 3 version of Parity protocol (private transactions messages added). pub const PAR_PROTOCOL_VERSION_3: (u8, u8) = (3, 0x18); +/// 4 version of Parity protocol (private state sync added). +pub const PAR_PROTOCOL_VERSION_4: (u8, u8) = (4, 0x20); pub const MAX_BODIES_TO_SEND: usize = 256; pub const MAX_HEADERS_TO_SEND: usize = 512; @@ -179,6 +181,7 @@ const RECEIPTS_TIMEOUT: Duration = Duration::from_secs(10); const FORK_HEADER_TIMEOUT: Duration = Duration::from_secs(3); const SNAPSHOT_MANIFEST_TIMEOUT: Duration = Duration::from_secs(5); const SNAPSHOT_DATA_TIMEOUT: Duration = Duration::from_secs(120); +const PRIVATE_STATE_TIMEOUT: Duration = Duration::from_secs(120); /// Defines how much time we have to complete priority transaction or block propagation. /// after the deadline is reached the task is considered finished @@ -277,6 +280,7 @@ pub enum PeerAsking { BlockReceipts, SnapshotManifest, SnapshotData, + PrivateState, } #[derive(PartialEq, Eq, Debug, Clone, Copy, MallocSizeOf)] @@ -316,6 +320,8 @@ pub struct PeerInfo { asking_blocks: Vec, /// Holds requested header hash if currently requesting block header by hash asking_hash: Option, + /// Holds requested private state hash + asking_private_state: Option, /// Holds requested snapshot chunk hash if any. asking_snapshot_data: Option, /// Request timestamp @@ -352,6 +358,7 @@ impl PeerInfo { fn reset_asking(&mut self) { self.asking_blocks.clear(); self.asking_hash = None; + self.asking_private_state = None; // mark any pending requests as expired if self.asking != PeerAsking::Nothing && self.is_allowed() { self.expired = true; @@ -1185,6 +1192,7 @@ impl ChainSync { PeerAsking::ForkHeader => elapsed > FORK_HEADER_TIMEOUT, PeerAsking::SnapshotManifest => elapsed > SNAPSHOT_MANIFEST_TIMEOUT, PeerAsking::SnapshotData => elapsed > SNAPSHOT_DATA_TIMEOUT, + PeerAsking::PrivateState => elapsed > PRIVATE_STATE_TIMEOUT, }; if timeout { debug!(target:"sync", "Timeout {}", peer_id); @@ -1291,6 +1299,18 @@ impl ChainSync { ).collect() } + fn get_private_state_peers(&self) -> Vec { + self.peers.iter().filter_map( + |(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_4.0 + && p.private_tx_enabled + && self.active_peers.contains(id) { + Some(*id) + } else { + None + } + ).collect() + } + /// Maintain other peers. Send out any new blocks and transactions pub fn maintain_sync(&mut self, io: &mut dyn SyncIo) { self.maybe_start_snapshot_sync(io); @@ -1360,6 +1380,19 @@ impl ChainSync { pub fn propagate_private_transaction(&mut self, io: &mut dyn SyncIo, transaction_hash: H256, packet_id: SyncPacket, packet: Bytes) { SyncPropagator::propagate_private_transaction(self, io, transaction_hash, packet_id, packet); } + + /// Request private state from peers + pub fn request_private_state(&mut self, io: &mut SyncIo, hash: &H256) { + let private_state_peers = self.get_private_state_peers(); + if private_state_peers.is_empty() { + error!(target: "privatetx", "Cannot request private state, no peers with private tx enabled available"); + } else { + trace!(target: "privatetx", "Requesting private stats from {:?}", private_state_peers); + for peer_id in private_state_peers { + SyncRequester::request_private_state(self, io, peer_id, hash); + } + } + } } #[cfg(test)] @@ -1480,6 +1513,7 @@ pub mod tests { asking: PeerAsking::Nothing, asking_blocks: Vec::new(), asking_hash: None, + asking_private_state: None, ask_time: Instant::now(), last_sent_transactions: Default::default(), last_sent_private_transactions: Default::default(), @@ -1533,7 +1567,7 @@ pub mod tests { let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); let chain_info = client.chain_info(); let ss = TestSnapshotService::new(); - let mut io = TestIo::new(&mut client, &ss, &queue, None); + let mut io = TestIo::new(&mut client, &ss, &queue, None, None); let peers = sync.get_lagging_peers(&chain_info); SyncPropagator::propagate_new_hashes(&mut sync, &chain_info, &mut io, &peers); @@ -1553,7 +1587,7 @@ pub mod tests { let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); let chain_info = client.chain_info(); let ss = TestSnapshotService::new(); - let mut io = TestIo::new(&mut client, &ss, &queue, None); + let mut io = TestIo::new(&mut client, &ss, &queue, None, None); let peers = sync.get_lagging_peers(&chain_info); SyncPropagator::propagate_blocks(&mut sync, &chain_info, &mut io, &[], &peers); @@ -1591,7 +1625,7 @@ pub mod tests { { let queue = RwLock::new(VecDeque::new()); let ss = TestSnapshotService::new(); - let mut io = TestIo::new(&mut client, &ss, &queue, None); + let mut io = TestIo::new(&mut client, &ss, &queue, None, None); io.chain.miner.chain_new_blocks(io.chain, &[], &[], &[], &good_blocks, false); sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks, &[], &[]); assert_eq!(io.chain.miner.ready_transactions(io.chain, 10, PendingOrdering::Priority).len(), 1); @@ -1604,7 +1638,7 @@ pub mod tests { { let queue = RwLock::new(VecDeque::new()); let ss = TestSnapshotService::new(); - let mut io = TestIo::new(&client, &ss, &queue, None); + let mut io = TestIo::new(&client, &ss, &queue, None, None); io.chain.miner.chain_new_blocks(io.chain, &[], &[], &good_blocks, &retracted_blocks, false); sync.chain_new_blocks(&mut io, &[], &[], &good_blocks, &retracted_blocks, &[], &[]); } @@ -1627,7 +1661,7 @@ pub mod tests { let queue = RwLock::new(VecDeque::new()); let ss = TestSnapshotService::new(); - let mut io = TestIo::new(&mut client, &ss, &queue, None); + let mut io = TestIo::new(&mut client, &ss, &queue, None, None); // when sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks, &[], &[]); diff --git a/ethcore/sync/src/chain/propagator.rs b/ethcore/sync/src/chain/propagator.rs index e347d81c8..1afb5e92b 100644 --- a/ethcore/sync/src/chain/propagator.rs +++ b/ethcore/sync/src/chain/propagator.rs @@ -355,7 +355,7 @@ mod tests { let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); let chain_info = client.chain_info(); let ss = TestSnapshotService::new(); - let mut io = TestIo::new(&mut client, &ss, &queue, None); + let mut io = TestIo::new(&mut client, &ss, &queue, None, None); let peers = sync.get_lagging_peers(&chain_info); let peer_count = SyncPropagator::propagate_new_hashes(&mut sync, &chain_info, &mut io, &peers); @@ -376,7 +376,7 @@ mod tests { let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); let chain_info = client.chain_info(); let ss = TestSnapshotService::new(); - let mut io = TestIo::new(&mut client, &ss, &queue, None); + let mut io = TestIo::new(&mut client, &ss, &queue, None, None); let peers = sync.get_lagging_peers(&chain_info); let peer_count = SyncPropagator::propagate_blocks(&mut sync, &chain_info, &mut io, &[], &peers); @@ -397,7 +397,7 @@ mod tests { let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); let chain_info = client.chain_info(); let ss = TestSnapshotService::new(); - let mut io = TestIo::new(&mut client, &ss, &queue, None); + let mut io = TestIo::new(&mut client, &ss, &queue, None, None); let peers = sync.get_lagging_peers(&chain_info); let peer_count = SyncPropagator::propagate_blocks(&mut sync ,&chain_info, &mut io, &[hash.clone()], &peers); @@ -427,6 +427,7 @@ mod tests { asking: PeerAsking::Nothing, asking_blocks: Vec::new(), asking_hash: None, + asking_private_state: None, ask_time: Instant::now(), last_sent_transactions: Default::default(), last_sent_private_transactions: Default::default(), @@ -440,7 +441,7 @@ mod tests { client_version: ClientVersion::from(""), }); let ss = TestSnapshotService::new(); - let mut io = TestIo::new(&mut client, &ss, &queue, None); + let mut io = TestIo::new(&mut client, &ss, &queue, None, None); SyncPropagator::propagate_proposed_blocks(&mut sync, &mut io, &[block]); // 1 message should be sent @@ -457,7 +458,7 @@ mod tests { let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(1), &client); let queue = RwLock::new(VecDeque::new()); let ss = TestSnapshotService::new(); - let mut io = TestIo::new(&mut client, &ss, &queue, None); + let mut io = TestIo::new(&mut client, &ss, &queue, None, None); let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); // Try to propagate same transactions for the second time let peer_count2 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); @@ -484,7 +485,7 @@ mod tests { let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(1), &client); let queue = RwLock::new(VecDeque::new()); let ss = TestSnapshotService::new(); - let mut io = TestIo::new(&mut client, &ss, &queue, None); + let mut io = TestIo::new(&mut client, &ss, &queue, None, None); let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); io.chain.insert_transaction_to_queue(); // New block import should not trigger propagation. @@ -508,7 +509,7 @@ mod tests { let mut sync = ChainSync::new(SyncConfig::default(), &client, None); let queue = RwLock::new(VecDeque::new()); let ss = TestSnapshotService::new(); - let mut io = TestIo::new(&mut client, &ss, &queue, None); + let mut io = TestIo::new(&mut client, &ss, &queue, None, None); let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); sync.chain_new_blocks(&mut io, &[], &[], &[], &[], &[], &[]); // Try to propagate same transactions for the second time @@ -529,7 +530,7 @@ mod tests { let ss = TestSnapshotService::new(); // should sent some { - let mut io = TestIo::new(&mut client, &ss, &queue, None); + let mut io = TestIo::new(&mut client, &ss, &queue, None, None); let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); assert_eq!(1, io.packets.len()); assert_eq!(1, peer_count); @@ -537,7 +538,7 @@ mod tests { // Insert some more client.insert_transaction_to_queue(); let (peer_count2, peer_count3) = { - let mut io = TestIo::new(&mut client, &ss, &queue, None); + let mut io = TestIo::new(&mut client, &ss, &queue, None, None); // Propagate new transactions let peer_count2 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); // And now the peer should have all transactions @@ -563,7 +564,7 @@ mod tests { let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(1), &client); let queue = RwLock::new(VecDeque::new()); let ss = TestSnapshotService::new(); - let mut io = TestIo::new(&mut client, &ss, &queue, None); + let mut io = TestIo::new(&mut client, &ss, &queue, None, None); SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); let stats = sync.transactions_stats(); @@ -578,7 +579,7 @@ mod tests { let mut sync = ChainSync::new(SyncConfig::default(), &client, None); let queue = RwLock::new(VecDeque::new()); let ss = TestSnapshotService::new(); - let mut io = TestIo::new(&mut client, &ss, &queue, None); + let mut io = TestIo::new(&mut client, &ss, &queue, None, None); // when peer#1 is Geth insert_dummy_peer(&mut sync, 1, block_hash); @@ -608,7 +609,7 @@ mod tests { let mut sync = ChainSync::new(SyncConfig::default(), &client, None); let queue = RwLock::new(VecDeque::new()); let ss = TestSnapshotService::new(); - let mut io = TestIo::new(&mut client, &ss, &queue, None); + let mut io = TestIo::new(&mut client, &ss, &queue, None, None); // when peer#1 is Parity, accepting service transactions insert_dummy_peer(&mut sync, 1, block_hash); diff --git a/ethcore/sync/src/chain/requester.rs b/ethcore/sync/src/chain/requester.rs index 6ff720823..b47c39301 100644 --- a/ethcore/sync/src/chain/requester.rs +++ b/ethcore/sync/src/chain/requester.rs @@ -30,6 +30,7 @@ use super::sync_packet::SyncPacket::{ GetReceiptsPacket, GetSnapshotManifestPacket, GetSnapshotDataPacket, + GetPrivateStatePacket, }; use super::{ @@ -99,6 +100,15 @@ impl SyncRequester { SyncRequester::send_request(sync, io, peer_id, PeerAsking::SnapshotManifest, GetSnapshotManifestPacket, rlp.out()); } + pub fn request_private_state(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, hash: &H256) { + trace!(target: "privatetx", "{} <- GetPrivateStatePacket", peer_id); + let mut rlp = RlpStream::new_list(1); + rlp.append(hash); + SyncRequester::send_request(sync, io, peer_id, PeerAsking::PrivateState, GetPrivateStatePacket, rlp.out()); + let peer = sync.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed"); + peer.asking_private_state = Some(hash.clone()); + } + /// Request headers from a peer by block hash fn request_headers_by_hash(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, h: &H256, count: u64, skip: u64, reverse: bool, set: BlockSet) { trace!(target: "sync", "{} <- GetBlockHeaders: {} entries starting from {}, set = {:?}", peer_id, count, h, set); diff --git a/ethcore/sync/src/chain/supplier.rs b/ethcore/sync/src/chain/supplier.rs index e802cb0e0..a6a95f07a 100644 --- a/ethcore/sync/src/chain/supplier.rs +++ b/ethcore/sync/src/chain/supplier.rs @@ -43,6 +43,8 @@ use super::sync_packet::SyncPacket::{ GetSnapshotDataPacket, SnapshotDataPacket, ConsensusDataPacket, + GetPrivateStatePacket, + PrivateStatePacket, }; use super::{ @@ -98,6 +100,11 @@ impl SyncSupplier { SyncSupplier::return_snapshot_data, |e| format!("Error sending snapshot data: {:?}", e)), + GetPrivateStatePacket => SyncSupplier::return_rlp( + io, &rlp, peer, + SyncSupplier::return_private_state, + |e| format!("Error sending private state data: {:?}", e)), + StatusPacket => { sync.write().on_packet(io, peer, packet_id, data); Ok(()) @@ -348,6 +355,26 @@ impl SyncSupplier { Ok(Some((SnapshotDataPacket.id(), rlp))) } + /// Respond to GetPrivateStatePacket + fn return_private_state(io: &SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult { + let hash: H256 = r.val_at(0)?; + trace!(target: "privatetx", "{} -> GetPrivateStatePacket {:?}", peer_id, hash); + io.private_state().map_or(Ok(None), |db| { + let state = db.state(&hash); + match state { + Err(err) => { + trace!(target: "privatetx", "Cannot retrieve state from db {:?}", err); + Ok(None) + } + Ok(bytes) => { + let mut rlp = RlpStream::new_list(1); + rlp.append(&bytes); + Ok(Some((PrivateStatePacket.id(), rlp))) + } + } + }) + } + fn return_rlp(io: &mut dyn SyncIo, rlp: &Rlp, peer: PeerId, rlp_func: FRlp, error_func: FError) -> Result<(), PacketDecodeError> where FRlp : Fn(&dyn SyncIo, &Rlp, PeerId) -> RlpResponseResult, FError : FnOnce(network::Error) -> String @@ -412,7 +439,7 @@ mod test { let queue = RwLock::new(VecDeque::new()); let ss = TestSnapshotService::new(); - let io = TestIo::new(&mut client, &ss, &queue, None); + let io = TestIo::new(&mut client, &ss, &queue, None, None); let unknown: H256 = H256::zero(); let result = SyncSupplier::return_block_headers(&io, &Rlp::new(&make_hash_req(&unknown, 1, 0, false)), 0); @@ -470,7 +497,7 @@ mod test { let queue = RwLock::new(VecDeque::new()); let ss = TestSnapshotService::new(); - let io = TestIo::new(&mut client, &ss, &queue, None); + let io = TestIo::new(&mut client, &ss, &queue, None, None); let small_result = SyncSupplier::return_block_bodies(&io, &Rlp::new(&small_rlp_request.out()), 0); let small_result = small_result.unwrap().unwrap().1; @@ -487,7 +514,7 @@ mod test { let queue = RwLock::new(VecDeque::new()); let sync = dummy_sync_with_peer(H256::zero(), &client); let ss = TestSnapshotService::new(); - let mut io = TestIo::new(&mut client, &ss, &queue, None); + let mut io = TestIo::new(&mut client, &ss, &queue, None, None); let mut node_list = RlpStream::new_list(3); node_list.append(&H256::from_str("0000000000000000000000000000000000000000000000005555555555555555").unwrap()); @@ -518,7 +545,7 @@ mod test { let mut client = TestBlockChainClient::new(); let queue = RwLock::new(VecDeque::new()); let ss = TestSnapshotService::new(); - let io = TestIo::new(&mut client, &ss, &queue, None); + let io = TestIo::new(&mut client, &ss, &queue, None, None); let result = SyncSupplier::return_receipts(&io, &Rlp::new(&[0xc0]), 0); @@ -531,7 +558,7 @@ mod test { let queue = RwLock::new(VecDeque::new()); let sync = dummy_sync_with_peer(H256::zero(), &client); let ss = TestSnapshotService::new(); - let mut io = TestIo::new(&mut client, &ss, &queue, None); + let mut io = TestIo::new(&mut client, &ss, &queue, None, None); let mut receipt_list = RlpStream::new_list(4); receipt_list.append(&H256::from_str("0000000000000000000000000000000000000000000000005555555555555555").unwrap()); diff --git a/ethcore/sync/src/chain/sync_packet.rs b/ethcore/sync/src/chain/sync_packet.rs index f3aa11119..c31ce7876 100644 --- a/ethcore/sync/src/chain/sync_packet.rs +++ b/ethcore/sync/src/chain/sync_packet.rs @@ -55,6 +55,8 @@ enum_from_primitive! { ConsensusDataPacket = 0x15, PrivateTransactionPacket = 0x16, SignedPrivateTransactionPacket = 0x17, + GetPrivateStatePacket = 0x18, + PrivateStatePacket = 0x19, } } @@ -94,7 +96,9 @@ impl PacketInfo for SyncPacket { SnapshotDataPacket | ConsensusDataPacket | PrivateTransactionPacket | - SignedPrivateTransactionPacket + SignedPrivateTransactionPacket | + GetPrivateStatePacket | + PrivateStatePacket => WARP_SYNC_PROTOCOL_ID, } diff --git a/ethcore/sync/src/lib.rs b/ethcore/sync/src/lib.rs index 674f0db44..eb1cdb98a 100644 --- a/ethcore/sync/src/lib.rs +++ b/ethcore/sync/src/lib.rs @@ -35,6 +35,7 @@ extern crate keccak_hash as hash; extern crate parity_bytes as bytes; extern crate parity_runtime; extern crate parking_lot; +extern crate ethcore_private_tx; extern crate rand; extern crate rlp; extern crate triehash_ethereum; @@ -43,7 +44,6 @@ extern crate futures; extern crate ethcore_light as light; #[cfg(test)] extern crate env_logger; -#[cfg(test)] extern crate ethcore_private_tx; #[cfg(test)] extern crate kvdb_memorydb; #[cfg(test)] extern crate rustc_hex; #[cfg(test)] extern crate rand_xorshift; diff --git a/ethcore/sync/src/private_tx.rs b/ethcore/sync/src/private_tx.rs index 121ad081f..5da3b172d 100644 --- a/ethcore/sync/src/private_tx.rs +++ b/ethcore/sync/src/private_tx.rs @@ -26,6 +26,9 @@ pub trait PrivateTxHandler: Send + Sync + 'static { /// Function called on new signed private transaction received. /// Returns the hash of the imported transaction fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result; + + /// Function called when requested private state retrieved from peer and saved to DB. + fn private_state_synced(&self, hash: &H256) -> Result<(), String>; } /// Nonoperative private transaction handler. @@ -39,6 +42,10 @@ impl PrivateTxHandler for NoopPrivateTxHandler { fn import_signed_private_transaction(&self, _rlp: &[u8]) -> Result { Ok(H256::zero()) } + + fn private_state_synced(&self, _hash: &H256) -> Result<(), String> { + Ok(()) + } } /// Simple private transaction handler. Used for tests. @@ -48,6 +55,8 @@ pub struct SimplePrivateTxHandler { pub txs: Mutex>>, /// imported signed private transactions pub signed_txs: Mutex>>, + /// synced private state hash + pub synced_hash: Mutex, } impl PrivateTxHandler for SimplePrivateTxHandler { @@ -60,4 +69,9 @@ impl PrivateTxHandler for SimplePrivateTxHandler { self.signed_txs.lock().push(rlp.to_vec()); Ok(H256::zero()) } + + fn private_state_synced(&self, hash: &H256) -> Result<(), String> { + *self.synced_hash.lock() = *hash; + Ok(()) + } } diff --git a/ethcore/sync/src/sync_io.rs b/ethcore/sync/src/sync_io.rs index b7a25bafd..5c974e712 100644 --- a/ethcore/sync/src/sync_io.rs +++ b/ethcore/sync/src/sync_io.rs @@ -14,12 +14,14 @@ // You should have received a copy of the GNU General Public License // along with Parity Ethereum. If not, see . +use std::sync::Arc; use std::collections::HashMap; use chain::sync_packet::{PacketInfo, SyncPacket}; use network::{NetworkContext, PeerId, PacketId, Error, SessionInfo, ProtocolId}; use network::client_version::ClientVersion; use bytes::Bytes; use client_traits::BlockChainClient; +use ethcore_private_tx::PrivateStateDB; use types::BlockNumber; use ethcore::snapshot::SnapshotService; use parking_lot::RwLock; @@ -40,6 +42,8 @@ pub trait SyncIo { fn chain(&self) -> &dyn BlockChainClient; /// Get the snapshot service. fn snapshot_service(&self) -> &dyn SnapshotService; + /// Get the private state wrapper + fn private_state(&self) -> Option>; /// Returns peer version identifier fn peer_version(&self, peer_id: PeerId) -> ClientVersion { ClientVersion::from(peer_id.to_string()) @@ -68,6 +72,7 @@ pub struct NetSyncIo<'s> { chain: &'s dyn BlockChainClient, snapshot_service: &'s dyn SnapshotService, chain_overlay: &'s RwLock>, + private_state: Option>, } impl<'s> NetSyncIo<'s> { @@ -75,12 +80,14 @@ impl<'s> NetSyncIo<'s> { pub fn new(network: &'s dyn NetworkContext, chain: &'s dyn BlockChainClient, snapshot_service: &'s dyn SnapshotService, - chain_overlay: &'s RwLock>) -> NetSyncIo<'s> { + chain_overlay: &'s RwLock>, + private_state: Option>) -> NetSyncIo<'s> { NetSyncIo { - network: network, - chain: chain, - snapshot_service: snapshot_service, - chain_overlay: chain_overlay, + network, + chain, + snapshot_service, + chain_overlay, + private_state, } } } @@ -114,6 +121,10 @@ impl<'s> SyncIo for NetSyncIo<'s> { self.snapshot_service } + fn private_state(&self) -> Option> { + self.private_state.clone() + } + fn peer_session_info(&self, peer_id: PeerId) -> Option { self.network.session_info(peer_id) } diff --git a/ethcore/sync/src/tests/helpers.rs b/ethcore/sync/src/tests/helpers.rs index 017e8b3ee..c4b6c7c43 100644 --- a/ethcore/sync/src/tests/helpers.rs +++ b/ethcore/sync/src/tests/helpers.rs @@ -27,12 +27,13 @@ use ethcore::client::{TestBlockChainClient, Client as EthcoreClient, ClientConfig, ChainNotify, NewBlocks, ChainMessageType, ClientIoMessage}; use ethcore::snapshot::SnapshotService; use ethcore::spec::{self, Spec}; +use ethcore_private_tx::PrivateStateDB; use ethcore::miner::Miner; use ethcore::test_helpers; use sync_io::SyncIo; use io::{IoChannel, IoContext, IoHandler}; use api::WARP_SYNC_PROTOCOL_ID; -use chain::{ChainSync, SyncSupplier, ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_3}; +use chain::{ChainSync, SyncSupplier, ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_4}; use chain::sync_packet::{PacketInfo, SyncPacket}; use chain::sync_packet::SyncPacket::{PrivateTransactionPacket, SignedPrivateTransactionPacket}; @@ -60,20 +61,28 @@ pub struct TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p { pub to_disconnect: HashSet, pub packets: Vec, pub peers_info: HashMap, + pub private_state_db: Option>, overlay: RwLock>, } impl<'p, C> TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p { - pub fn new(chain: &'p C, ss: &'p TestSnapshotService, queue: &'p RwLock>, sender: Option) -> TestIo<'p, C> { + pub fn new( + chain: &'p C, + ss: &'p TestSnapshotService, + queue: &'p RwLock>, + sender: Option, + private_state_db: Option> + ) -> TestIo<'p, C> { TestIo { - chain: chain, + chain, snapshot_service: ss, - queue: queue, - sender: sender, + queue, + sender, to_disconnect: HashSet::new(), - overlay: RwLock::new(HashMap::new()), packets: Vec::new(), peers_info: HashMap::new(), + private_state_db, + overlay: RwLock::new(HashMap::new()), } } } @@ -131,6 +140,10 @@ impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p { self.snapshot_service } + fn private_state(&self) -> Option> { + self.private_state_db.clone() + } + fn peer_session_info(&self, _peer_id: PeerId) -> Option { None } @@ -140,7 +153,7 @@ impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p { } fn protocol_version(&self, protocol: &ProtocolId, peer_id: PeerId) -> u8 { - if protocol == &WARP_SYNC_PROTOCOL_ID { PAR_PROTOCOL_VERSION_3.0 } else { self.eth_protocol_version(peer_id) } + if protocol == &WARP_SYNC_PROTOCOL_ID { PAR_PROTOCOL_VERSION_4.0 } else { self.eth_protocol_version(peer_id) } } fn chain_overlay(&self) -> &RwLock> { @@ -220,6 +233,7 @@ pub struct EthPeer where C: FlushingBlockChainClient { pub private_tx_handler: Arc, pub io_queue: RwLock>, new_blocks_queue: RwLock>, + private_state_db: RwLock>>, } impl EthPeer where C: FlushingBlockChainClient { @@ -232,18 +246,20 @@ impl EthPeer where C: FlushingBlockChainClient { } fn process_io_message(&self, message: ChainMessageType) { - let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None); + let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None, self.private_state_db()); match message { ChainMessageType::Consensus(data) => self.sync.write().propagate_consensus_packet(&mut io, data), ChainMessageType::PrivateTransaction(transaction_hash, data) => self.sync.write().propagate_private_transaction(&mut io, transaction_hash, PrivateTransactionPacket, data), ChainMessageType::SignedPrivateTransaction(transaction_hash, data) => self.sync.write().propagate_private_transaction(&mut io, transaction_hash, SignedPrivateTransactionPacket, data), + ChainMessageType::PrivateStateRequest(hash) => + self.sync.write().request_private_state(&mut io, &hash), } } fn process_new_block_message(&self, message: NewBlockMessage) { - let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None); + let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None, self.private_state_db()); self.sync.write().chain_new_blocks( &mut io, &message.imported, @@ -254,6 +270,15 @@ impl EthPeer where C: FlushingBlockChainClient { &message.proposed ); } + + pub fn set_private_state_db(&self, db: Arc) { + *self.private_state_db.write() = Some(db); + } + + fn private_state_db(&self) -> Option> { + let db = self.private_state_db.read(); + db.clone() + } } impl Peer for EthPeer { @@ -265,17 +290,18 @@ impl Peer for EthPeer { &*self.chain, &self.snapshot_service, &self.queue, - Some(other)), + Some(other), + self.private_state_db()), other); } fn on_disconnect(&self, other: PeerId) { - let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, Some(other)); + let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, Some(other), self.private_state_db()); self.sync.write().on_peer_aborting(&mut io, other); } fn receive_message(&self, from: PeerId, msg: TestPacket) -> HashSet { - let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, Some(from)); + let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, Some(from), self.private_state_db()); SyncSupplier::dispatch_packet(&self.sync, &mut io, from, msg.packet_id, &msg.data); self.chain.flush(); io.to_disconnect.clone() @@ -291,7 +317,7 @@ impl Peer for EthPeer { } fn sync_step(&self) { - let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None); + let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None, self.private_state_db()); self.chain.flush(); self.sync.write().maintain_peers(&mut io); self.sync.write().maintain_sync(&mut io); @@ -300,7 +326,7 @@ impl Peer for EthPeer { } fn restart_sync(&self) { - self.sync.write().restart(&mut TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None)); + self.sync.write().restart(&mut TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None, self.private_state_db())); } fn process_all_io_messages(&self) { @@ -357,6 +383,7 @@ impl TestNet> { private_tx_handler, io_queue: RwLock::new(VecDeque::new()), new_blocks_queue: RwLock::new(VecDeque::new()), + private_state_db: RwLock::new(None), })); } net @@ -410,6 +437,7 @@ impl TestNet> { private_tx_handler, io_queue: RwLock::new(VecDeque::new()), new_blocks_queue: RwLock::new(VecDeque::new()), + private_state_db: RwLock::new(None), }); peer.chain.add_notify(peer.clone()); //private_provider.add_notify(peer.clone()); @@ -508,7 +536,7 @@ impl

TestNet

where P: Peer { impl TestNet> { pub fn trigger_chain_new_blocks(&mut self, peer_id: usize) { let peer = &mut self.peers[peer_id]; - peer.sync.write().chain_new_blocks(&mut TestIo::new(&*peer.chain, &peer.snapshot_service, &peer.queue, None), &[], &[], &[], &[], &[], &[]); + peer.sync.write().chain_new_blocks(&mut TestIo::new(&*peer.chain, &peer.snapshot_service, &peer.queue, None, None), &[], &[], &[], &[], &[], &[]); } } diff --git a/ethcore/sync/src/tests/private.rs b/ethcore/sync/src/tests/private.rs index 1d91e2124..27fa6f013 100644 --- a/ethcore/sync/src/tests/private.rs +++ b/ethcore/sync/src/tests/private.rs @@ -26,7 +26,7 @@ use ethcore::{ client::ClientIoMessage, miner::{self, MinerService}, spec::Spec, - test_helpers::push_block_with_transactions, + test_helpers::{push_block_with_transactions, new_db}, }; use ethcore_private_tx::{Provider, ProviderConfig, NoopEncryptor, Importer, SignedPrivateTransaction, StoringKeyProvider}; use ethkey::KeyPair; @@ -73,16 +73,18 @@ fn send_private_transaction() { validator_accounts: vec![s1.address()], signer_account: None, logs_path: None, + use_offchain_storage: false, }; let signer_config = ProviderConfig{ validator_accounts: Vec::new(), signer_account: Some(s0.address()), logs_path: None, + use_offchain_storage: false, }; let private_keys = Arc::new(StoringKeyProvider::default()); - + let db = new_db(); let pm0 = Arc::new(Provider::new( client0.clone(), net.peer(0).miner.clone(), @@ -91,6 +93,7 @@ fn send_private_transaction() { signer_config, IoChannel::to_handler(Arc::downgrade(&io_handler0)), private_keys.clone(), + db.key_value().clone(), )); pm0.add_notify(net.peers[0].clone()); @@ -102,6 +105,7 @@ fn send_private_transaction() { validator_config, IoChannel::to_handler(Arc::downgrade(&io_handler1)), private_keys.clone(), + db.key_value().clone(), )); pm1.add_notify(net.peers[1].clone()); @@ -157,3 +161,134 @@ fn send_private_transaction() { let local_transactions = net.peer(0).miner.local_transactions(); assert_eq!(local_transactions.len(), 1); } + +#[test] +fn sync_private_state() { + // Setup two clients + let s0 = KeyPair::from_secret_slice(&keccak("1").as_bytes()).unwrap(); + let s1 = KeyPair::from_secret_slice(&keccak("0").as_bytes()).unwrap(); + + let signer = Arc::new(ethcore_private_tx::KeyPairSigner(vec![s0.clone(), s1.clone()])); + + let mut net = TestNet::with_spec(2, SyncConfig::default(), seal_spec); + let client0 = net.peer(0).chain.clone(); + let client1 = net.peer(1).chain.clone(); + let io_handler0: Arc> = Arc::new(TestIoHandler::new(net.peer(0).chain.clone())); + let io_handler1: Arc> = Arc::new(TestIoHandler::new(net.peer(1).chain.clone())); + + net.peer(0).miner.set_author(miner::Author::Sealer(signer::from_keypair(s0.clone()))); + net.peer(1).miner.set_author(miner::Author::Sealer(signer::from_keypair(s1.clone()))); + net.peer(0).chain.engine().register_client(Arc::downgrade(&net.peer(0).chain) as _); + net.peer(1).chain.engine().register_client(Arc::downgrade(&net.peer(1).chain) as _); + net.peer(0).chain.set_io_channel(IoChannel::to_handler(Arc::downgrade(&io_handler0))); + net.peer(1).chain.set_io_channel(IoChannel::to_handler(Arc::downgrade(&io_handler1))); + + let (address, _) = contract_address(CreateContractAddress::FromSenderAndNonce, &s0.address(), &0.into(), &[]); + let chain_id = client0.signing_chain_id(); + + // Exhange statuses + net.sync(); + + // Setup private providers + let validator_config = ProviderConfig{ + validator_accounts: vec![s1.address()], + signer_account: None, + logs_path: None, + use_offchain_storage: true, + }; + + let signer_config = ProviderConfig{ + validator_accounts: Vec::new(), + signer_account: Some(s0.address()), + logs_path: None, + use_offchain_storage: true, + }; + + let private_keys = Arc::new(StoringKeyProvider::default()); + let db0 = new_db(); + let pm0 = Arc::new(Provider::new( + client0.clone(), + net.peer(0).miner.clone(), + signer.clone(), + Box::new(NoopEncryptor::default()), + signer_config, + IoChannel::to_handler(Arc::downgrade(&io_handler0)), + private_keys.clone(), + db0.key_value().clone(), + )); + pm0.add_notify(net.peers[0].clone()); + + let db1 = new_db(); + let pm1 = Arc::new(Provider::new( + client1.clone(), + net.peer(1).miner.clone(), + signer.clone(), + Box::new(NoopEncryptor::default()), + validator_config, + IoChannel::to_handler(Arc::downgrade(&io_handler1)), + private_keys.clone(), + db1.key_value().clone(), + )); + pm1.add_notify(net.peers[1].clone()); + + net.peer(0).set_private_state_db(pm0.private_state_db()); + net.peer(1).set_private_state_db(pm1.private_state_db()); + + // Create and deploy contract + let private_contract_test = "6060604052341561000f57600080fd5b60d88061001d6000396000f30060606040526000357c0100000000000000000000000000000000000000000000000000000000900463ffffffff1680630c55699c146046578063bc64b76d14607457600080fd5b3415605057600080fd5b60566098565b60405180826000191660001916815260200191505060405180910390f35b3415607e57600080fd5b6096600480803560001916906020019091905050609e565b005b60005481565b8060008160001916905550505600a165627a7a723058206acbdf4b15ca4c2d43e1b1879b830451a34f1e9d02ff1f2f394d8d857e79d2080029".from_hex().unwrap(); + let mut private_create_tx = Transaction::default(); + private_create_tx.action = Action::Create; + private_create_tx.data = private_contract_test; + private_create_tx.gas = 200000.into(); + let private_create_tx_signed = private_create_tx.sign(&s0.secret(), None); + let validators = vec![s1.address()]; + let (public_tx, _) = pm0.public_creation_transaction(BlockId::Latest, &private_create_tx_signed, &validators, 0.into()).unwrap(); + let public_tx = public_tx.sign(&s0.secret(), chain_id); + + let public_tx_copy = public_tx.clone(); + push_block_with_transactions(&client0, &[public_tx]); + push_block_with_transactions(&client1, &[public_tx_copy]); + + net.sync(); + + //Create private transaction for modifying state + let mut private_tx = Transaction::default(); + private_tx.action = Action::Call(address.clone()); + private_tx.data = "bc64b76d2a00000000000000000000000000000000000000000000000000000000000000".from_hex().unwrap(); //setX(42) + private_tx.gas = 120000.into(); + private_tx.nonce = 1.into(); + let private_tx = private_tx.sign(&s0.secret(), None); + let _create_res = pm0.create_private_transaction(private_tx); + + //send private transaction message to validator + net.sync(); + + let validator_handler = net.peer(1).private_tx_handler.clone(); + let received_private_transactions = validator_handler.txs.lock().clone(); + assert_eq!(received_private_transactions.len(), 1); + + //process received private transaction message + let private_transaction = received_private_transactions[0].clone(); + let _import_res = pm1.import_private_transaction(&private_transaction); + + // Second node requests the state from the first one + net.sync(); + + let synced_hash = validator_handler.synced_hash.lock().clone(); + assert!(pm1.private_state_synced(&synced_hash).is_ok()); + + // Second node has private state up-to-date and can verify the private transaction + // Further should work the standard flow + net.sync(); + let sender_handler = net.peer(0).private_tx_handler.clone(); + let received_signed_private_transactions = sender_handler.signed_txs.lock().clone(); + assert_eq!(received_signed_private_transactions.len(), 1); + + //process signed response + let signed_private_transaction = received_signed_private_transactions[0].clone(); + assert!(pm0.import_signed_private_transaction(&signed_private_transaction).is_ok()); + let signature: SignedPrivateTransaction = Rlp::new(&signed_private_transaction).as_val().unwrap(); + assert!(pm0.process_signature(&signature).is_ok()); + let local_transactions = net.peer(0).miner.local_transactions(); + assert_eq!(local_transactions.len(), 1); +} diff --git a/parity/cli/mod.rs b/parity/cli/mod.rs index 96e68056d..a38f2b944 100644 --- a/parity/cli/mod.rs +++ b/parity/cli/mod.rs @@ -357,6 +357,10 @@ usage! { "--private-tx-enabled", "Enable private transactions.", + FLAG flag_private_state_offchain: (bool) = false, or |c: &Config| c.private_tx.as_ref()?.state_offchain, + "--private-state-offchain", + "Store private state offchain (in the local DB).", + ARG arg_private_signer: (Option) = None, or |c: &Config| c.private_tx.as_ref()?.signer.clone(), "--private-signer=[ACCOUNT]", "Specify the account for signing public transaction created upon verified private transaction.", @@ -1200,6 +1204,7 @@ struct Account { #[serde(deny_unknown_fields)] struct PrivateTransactions { enabled: Option, + state_offchain: Option, signer: Option, validators: Option>, account: Option, @@ -1766,6 +1771,7 @@ mod tests { // -- Private Transactions Options flag_private_enabled: true, + flag_private_state_offchain: false, arg_private_signer: Some("0xdeadbeefcafe0000000000000000000000000000".into()), arg_private_validators: Some("0xdeadbeefcafe0000000000000000000000000000".into()), arg_private_passwords: Some("~/.safe/password.file".into()), diff --git a/parity/configuration.rs b/parity/configuration.rs index b9a8d37d2..007436b1e 100644 --- a/parity/configuration.rs +++ b/parity/configuration.rs @@ -926,7 +926,8 @@ impl Configuration { logs_path: match self.args.flag_private_enabled { true => Some(dirs.base), false => None, - } + }, + use_offchain_storage: self.args.flag_private_state_offchain, }; let encryptor_conf = EncryptorConfig { diff --git a/parity/db/rocksdb/migration.rs b/parity/db/rocksdb/migration.rs index a054f3689..f9e92570b 100644 --- a/parity/db/rocksdb/migration.rs +++ b/parity/db/rocksdb/migration.rs @@ -42,10 +42,18 @@ pub const TO_V12: ChangeColumns = ChangeColumns { version: 12, }; +/// The migration from v12 to v14. +/// Adds a column for private transactions state storage. +pub const TO_V14: ChangeColumns = ChangeColumns { + pre_columns: Some(8), + post_columns: Some(9), + version: 14, +}; + /// Database is assumed to be at default version, when no version file is found. const DEFAULT_VERSION: u32 = 5; /// Current version of database models. -const CURRENT_VERSION: u32 = 13; +const CURRENT_VERSION: u32 = 14; /// A version of database at which blooms-db was introduced const BLOOMS_DB_VERSION: u32 = 13; /// Defines how many items are migrated to the new version of database at once. @@ -147,6 +155,7 @@ fn consolidated_database_migrations(compaction_profile: &CompactionProfile) -> R let mut manager = MigrationManager::new(default_migration_settings(compaction_profile)); manager.add_migration(TO_V11).map_err(|_| Error::MigrationImpossible)?; manager.add_migration(TO_V12).map_err(|_| Error::MigrationImpossible)?; + manager.add_migration(TO_V14).map_err(|_| Error::MigrationImpossible)?; Ok(manager) } diff --git a/parity/modules.rs b/parity/modules.rs index f8607b827..7b56fa6d5 100644 --- a/parity/modules.rs +++ b/parity/modules.rs @@ -19,6 +19,7 @@ use std::sync::{Arc, mpsc}; use client_traits::BlockChainClient; use sync::{self, SyncConfig, NetworkConfiguration, Params, ConnectionFilter}; use ethcore::snapshot::SnapshotService; +use ethcore_private_tx::PrivateStateDB; use light::Provider; use parity_runtime::Executor; @@ -40,6 +41,7 @@ pub fn sync( chain: Arc, snapshot_service: Arc, private_tx_handler: Option>, + private_state: Option>, provider: Arc, _log_settings: &LogConfig, connection_filter: Option>, @@ -51,6 +53,7 @@ pub fn sync( provider, snapshot_service, private_tx_handler, + private_state, network_config, }, connection_filter)?; diff --git a/parity/run.rs b/parity/run.rs index 82ae9694f..efd0781f3 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -626,9 +626,9 @@ fn execute_impl(cmd: RunCmd, logger: Arc, on_client_rq: .map_err(|e| format!("Stratum start error: {:?}", e))?; } - let private_tx_sync: Option> = match cmd.private_tx_enabled { - true => Some(private_tx_service.clone() as Arc), - false => None, + let (private_tx_sync, private_state) = match cmd.private_tx_enabled { + true => (Some(private_tx_service.clone() as Arc), Some(private_tx_provider.private_state_db())), + false => (None, None), }; // create sync object @@ -639,6 +639,7 @@ fn execute_impl(cmd: RunCmd, logger: Arc, on_client_rq: client.clone(), snapshot_service.clone(), private_tx_sync, + private_state, client.clone(), &cmd.logger_config, connection_filter.clone().map(|f| f as Arc<::sync::ConnectionFilter + 'static>), diff --git a/rpc/src/v1/types/private_log.rs b/rpc/src/v1/types/private_log.rs index 2c90bcfa9..788036c38 100644 --- a/rpc/src/v1/types/private_log.rs +++ b/rpc/src/v1/types/private_log.rs @@ -24,6 +24,10 @@ use ethcore_private_tx::{TransactionLog as EthTransactionLog, ValidatorLog as Et pub enum Status { /// Private tx was created but no validation received yet Created, + /// Private state not found locally and being retrived from peers + PrivateStateSync, + /// Retrieval of the private state failed, transaction not created + PrivateStateSyncFailed, /// Several validators (but not all) validated the transaction Validating, /// All validators validated the private tx @@ -35,6 +39,8 @@ impl From for Status { fn from(c: EthStatus) -> Self { match c { EthStatus::Created => Status::Created, + EthStatus::PrivateStateSync => Status::PrivateStateSync, + EthStatus::PrivateStateSyncFailed => Status::PrivateStateSyncFailed, EthStatus::Validating => Status::Validating, EthStatus::Deployed => Status::Deployed, }