Private contract migration and offchain state sync (#10748)

* Temp storage for the private state added

* Temp storage for the private state added

* Request message added

* Store and retrieve offchain state logic

* State sync cache

* Private state column added to key value db

* Private state column added to key value db

* Indexing stored states via its hash

* Works with errors changed

* Private state stored into the local db

* Access to private state db added to sync io

* Private state db file added

* Rlp packets for retrieiving private state data added

* Handling of private sync completed message

* Test code fixed

* External flag for offchain storing added

* Test for private state sync added

* Saving private state logic corrected

* Migration code corrected

* Fixes after merge with master

* Merge with head

* Additional checks for slices

* Log for private state retrieval added

* Limit time of retrieving private states

* Store required hashes for every request, mark them stale if needed

* Store requested private state hashes and check received data

* Log stale requests

* State insertion fix

* Refactoring of how logging passed to state store

* Heapsize removed, syncing hashes structure reworked

* Check state length returned by contract

* Get rid of OverlayDB

* hash-db version updated

* Test fixed

* One more test fixed
This commit is contained in:
Anton Gavrilov 2019-08-16 14:45:52 +02:00 committed by GitHub
parent bd1a578f93
commit 66e4410be7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 990 additions and 141 deletions

5
Cargo.lock generated
View File

@ -1230,6 +1230,7 @@ dependencies = [
"ethabi-derive 8.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "ethabi-derive 8.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"ethcore 1.12.0", "ethcore 1.12.0",
"ethcore-call-contract 0.1.0", "ethcore-call-contract 0.1.0",
"ethcore-db 0.1.0",
"ethcore-io 1.12.0", "ethcore-io 1.12.0",
"ethcore-miner 1.12.0", "ethcore-miner 1.12.0",
"ethereum-types 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", "ethereum-types 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1237,7 +1238,11 @@ dependencies = [
"ethkey 0.3.0", "ethkey 0.3.0",
"fetch 0.1.0", "fetch 0.1.0",
"futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)",
"hash-db 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)",
"journaldb 0.2.0",
"keccak-hash 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "keccak-hash 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"keccak-hasher 0.1.1",
"kvdb 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"machine 0.1.0", "machine 0.1.0",
"parity-bytes 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "parity-bytes 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -41,8 +41,10 @@ pub const COL_ACCOUNT_BLOOM: Option<u32> = Some(5);
pub const COL_NODE_INFO: Option<u32> = Some(6); pub const COL_NODE_INFO: Option<u32> = Some(6);
/// Column for the light client chain. /// Column for the light client chain.
pub const COL_LIGHT_CHAIN: Option<u32> = Some(7); pub const COL_LIGHT_CHAIN: Option<u32> = Some(7);
/// Column for the private transactions state.
pub const COL_PRIVATE_TRANSACTIONS_STATE: Option<u32> = Some(8);
/// Number of columns in DB /// Number of columns in DB
pub const NUM_COLUMNS: Option<u32> = Some(8); pub const NUM_COLUMNS: Option<u32> = Some(9);
/// Modes for updating caches. /// Modes for updating caches.
#[derive(Clone, Copy)] #[derive(Clone, Copy)]

View File

@ -14,6 +14,7 @@ ethabi = "8.0"
ethabi-contract = "8.0" ethabi-contract = "8.0"
ethabi-derive = "8.0" ethabi-derive = "8.0"
ethcore = { path = ".." } ethcore = { path = ".." }
ethcore-db = { path = "../db" }
ethcore-call-contract = { path = "../call-contract" } ethcore-call-contract = { path = "../call-contract" }
ethcore-io = { path = "../../util/io" } ethcore-io = { path = "../../util/io" }
ethcore-miner = { path = "../../miner" } ethcore-miner = { path = "../../miner" }
@ -23,9 +24,13 @@ ethkey = { path = "../../accounts/ethkey" }
fetch = { path = "../../util/fetch" } fetch = { path = "../../util/fetch" }
futures = "0.1" futures = "0.1"
parity-util-mem = "0.2.0" parity-util-mem = "0.2.0"
hash-db = "0.15.0"
keccak-hash = "0.2.0" keccak-hash = "0.2.0"
keccak-hasher = { path = "../../util/keccak-hasher" }
kvdb = "0.1"
log = "0.4" log = "0.4"
machine = { path = "../machine" } machine = { path = "../machine" }
journaldb = { path = "../../util/journaldb" }
parity-bytes = "0.1" parity-bytes = "0.1"
parity-crypto = "0.4.0" parity-crypto = "0.4.0"
parking_lot = "0.8" parking_lot = "0.8"

View File

@ -99,6 +99,12 @@ pub enum Error {
/// Account for signing requests to key server not set. /// Account for signing requests to key server not set.
#[display(fmt = "Account for signing requests to key server not set.")] #[display(fmt = "Account for signing requests to key server not set.")]
KeyServerAccountNotSet, KeyServerAccountNotSet,
/// Private state for the contract was not found in the local storage.
#[display(fmt = "Private state for the contract was not found in the local storage.")]
PrivateStateNotFound,
/// Cannot write state to the local database.
#[display(fmt = "Cannot write state to the local database.")]
DatabaseWriteError,
/// Encryption key is not found on key server. /// Encryption key is not found on key server.
#[display(fmt = "Encryption key is not found on key server for {}", _0)] #[display(fmt = "Encryption key is not found on key server for {}", _0)]
EncryptionKeyNotFound(Address), EncryptionKeyNotFound(Address),

View File

@ -22,6 +22,8 @@ mod private_transactions;
mod messages; mod messages;
mod error; mod error;
mod log; mod log;
mod state_store;
mod private_state_db;
extern crate account_state; extern crate account_state;
extern crate client_traits; extern crate client_traits;
@ -29,6 +31,7 @@ extern crate common_types as types;
extern crate ethabi; extern crate ethabi;
extern crate ethcore; extern crate ethcore;
extern crate ethcore_call_contract as call_contract; extern crate ethcore_call_contract as call_contract;
extern crate ethcore_db;
extern crate ethcore_io as io; extern crate ethcore_io as io;
extern crate ethcore_miner; extern crate ethcore_miner;
extern crate ethereum_types; extern crate ethereum_types;
@ -37,8 +40,12 @@ extern crate ethkey;
extern crate fetch; extern crate fetch;
extern crate futures; extern crate futures;
extern crate parity_util_mem; extern crate parity_util_mem;
extern crate hash_db;
extern crate keccak_hash as hash; extern crate keccak_hash as hash;
extern crate keccak_hasher;
extern crate kvdb;
extern crate machine; extern crate machine;
extern crate journaldb;
extern crate parity_bytes as bytes; extern crate parity_bytes as bytes;
extern crate parity_crypto as crypto; extern crate parity_crypto as crypto;
extern crate parking_lot; extern crate parking_lot;
@ -77,18 +84,21 @@ pub use encryptor::{Encryptor, SecretStoreEncryptor, EncryptorConfig, NoopEncryp
pub use key_server_keys::{KeyProvider, SecretStoreKeys, StoringKeyProvider}; pub use key_server_keys::{KeyProvider, SecretStoreKeys, StoringKeyProvider};
pub use private_transactions::{VerifiedPrivateTransaction, VerificationStore, PrivateTransactionSigningDesc, SigningStore}; pub use private_transactions::{VerifiedPrivateTransaction, VerificationStore, PrivateTransactionSigningDesc, SigningStore};
pub use messages::{PrivateTransaction, SignedPrivateTransaction}; pub use messages::{PrivateTransaction, SignedPrivateTransaction};
pub use private_state_db::PrivateStateDB;
pub use error::Error; pub use error::Error;
pub use log::{Logging, TransactionLog, ValidatorLog, PrivateTxStatus, FileLogsSerializer}; pub use log::{Logging, TransactionLog, ValidatorLog, PrivateTxStatus, FileLogsSerializer};
use state_store::{PrivateStateStorage, RequestType};
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use std::collections::{HashMap, HashSet, BTreeMap}; use std::collections::{HashMap, HashSet, BTreeMap};
use std::time::Duration;
use ethereum_types::{H128, H256, U256, Address, BigEndianHash}; use ethereum_types::{H128, H256, U256, Address, BigEndianHash};
use hash::keccak; use hash::keccak;
use rlp::*; use rlp::*;
use parking_lot::RwLock; use parking_lot::RwLock;
use bytes::Bytes; use bytes::Bytes;
use ethkey::{Signature, recover, public_to_address}; use ethkey::{Signature, recover, public_to_address};
use io::IoChannel; use io::{IoChannel, IoHandler, IoContext, TimerToken};
use machine::{ use machine::{
executive::{Executive, TransactOptions, contract_address as ethcore_contract_address}, executive::{Executive, TransactOptions, contract_address as ethcore_contract_address},
executed::Executed as FlatExecuted, executed::Executed as FlatExecuted,
@ -107,6 +117,7 @@ use state_db::StateDB;
use account_state::State; use account_state::State;
use trace::{Tracer, VMTracer}; use trace::{Tracer, VMTracer};
use call_contract::CallContract; use call_contract::CallContract;
use kvdb::KeyValueDB;
use rustc_hex::FromHex; use rustc_hex::FromHex;
use ethabi::FunctionOutputDecoder; use ethabi::FunctionOutputDecoder;
use vm::CreateContractAddress; use vm::CreateContractAddress;
@ -128,6 +139,12 @@ const INITIAL_PRIVATE_CONTRACT_VER: usize = 1;
/// Version for the private contract notification about private state changes added /// Version for the private contract notification about private state changes added
const PRIVATE_CONTRACT_WITH_NOTIFICATION_VER: usize = 2; const PRIVATE_CONTRACT_WITH_NOTIFICATION_VER: usize = 2;
/// Timer for private state retrieval
const STATE_RETRIEVAL_TIMER: TimerToken = 0;
/// Timer for private state retrieval, 5 secs duration
const STATE_RETRIEVAL_TICK: Duration = Duration::from_secs(5);
/// Configurtion for private transaction provider /// Configurtion for private transaction provider
#[derive(Default, PartialEq, Debug, Clone)] #[derive(Default, PartialEq, Debug, Clone)]
pub struct ProviderConfig { pub struct ProviderConfig {
@ -137,6 +154,8 @@ pub struct ProviderConfig {
pub signer_account: Option<Address>, pub signer_account: Option<Address>,
/// Path to private tx logs /// Path to private tx logs
pub logs_path: Option<String>, pub logs_path: Option<String>,
/// Provider should store the state of the private contract offchain (in DB)
pub use_offchain_storage: bool,
} }
#[derive(Debug)] #[derive(Debug)]
@ -198,6 +217,8 @@ pub struct Provider {
channel: IoChannel<ClientIoMessage>, channel: IoChannel<ClientIoMessage>,
keys_provider: Arc<KeyProvider>, keys_provider: Arc<KeyProvider>,
logging: Option<Logging>, logging: Option<Logging>,
use_offchain_storage: bool,
state_storage: PrivateStateStorage,
} }
#[derive(Debug)] #[derive(Debug)]
@ -218,6 +239,7 @@ impl Provider {
config: ProviderConfig, config: ProviderConfig,
channel: IoChannel<ClientIoMessage>, channel: IoChannel<ClientIoMessage>,
keys_provider: Arc<KeyProvider>, keys_provider: Arc<KeyProvider>,
db: Arc<KeyValueDB>,
) -> Self { ) -> Self {
keys_provider.update_acl_contract(); keys_provider.update_acl_contract();
Provider { Provider {
@ -233,9 +255,16 @@ impl Provider {
channel, channel,
keys_provider, keys_provider,
logging: config.logs_path.map(|path| Logging::new(Arc::new(FileLogsSerializer::with_path(path)))), logging: config.logs_path.map(|path| Logging::new(Arc::new(FileLogsSerializer::with_path(path)))),
use_offchain_storage: config.use_offchain_storage,
state_storage: PrivateStateStorage::new(db),
} }
} }
/// Returns private state DB
pub fn private_state_db(&self) -> Arc<PrivateStateDB> {
self.state_storage.private_state_db()
}
// TODO [ToDr] Don't use `ChainNotify` here! // TODO [ToDr] Don't use `ChainNotify` here!
// Better to create a separate notification type for this. // Better to create a separate notification type for this.
/// Adds an actor to be notified on certain events /// Adds an actor to be notified on certain events
@ -273,22 +302,42 @@ impl Provider {
// best would be to change the API and only allow H256 instead of BlockID // best would be to change the API and only allow H256 instead of BlockID
// in private-tx to avoid such mistakes. // in private-tx to avoid such mistakes.
let contract_nonce = self.get_contract_nonce(&contract, BlockId::Latest)?; let contract_nonce = self.get_contract_nonce(&contract, BlockId::Latest)?;
let private_state = self.execute_private_transaction(BlockId::Latest, &signed_transaction)?; let private_state = self.execute_private_transaction(BlockId::Latest, &signed_transaction);
trace!(target: "privatetx", "Private transaction created, encrypted transaction: {:?}, private state: {:?}", private, private_state); match private_state {
let contract_validators = self.get_validators(BlockId::Latest, &contract)?; Err(err) => {
trace!(target: "privatetx", "Required validators: {:?}", contract_validators); match err {
let private_state_hash = self.calculate_state_hash(&private_state, contract_nonce); Error::PrivateStateNotFound => {
trace!(target: "privatetx", "Hashed effective private state for sender: {:?}", private_state_hash); trace!(target: "privatetx", "Private state for the contract not found, requesting from peers");
self.transactions_for_signing.write().add_transaction(private.hash(), signed_transaction, &contract_validators, private_state, contract_nonce)?; if let Some(ref logging) = self.logging {
self.broadcast_private_transaction(private.hash(), private.rlp_bytes()); let contract_validators = self.get_validators(BlockId::Latest, &contract)?;
if let Some(ref logging) = self.logging { logging.private_tx_created(&tx_hash, &contract_validators);
logging.private_tx_created(&tx_hash, &contract_validators); logging.private_state_request(&tx_hash);
}
let request = RequestType::Creation(signed_transaction);
self.request_private_state(&contract, request)?;
},
_ => {},
}
Err(err)
}
Ok(private_state) => {
trace!(target: "privatetx", "Private transaction created, encrypted transaction: {:?}, private state: {:?}", private, private_state);
let contract_validators = self.get_validators(BlockId::Latest, &contract)?;
trace!(target: "privatetx", "Required validators: {:?}", contract_validators);
let private_state_hash = self.calculate_state_hash(&private_state, contract_nonce);
trace!(target: "privatetx", "Hashed effective private state for sender: {:?}", private_state_hash);
self.transactions_for_signing.write().add_transaction(private.hash(), signed_transaction, &contract_validators, private_state, contract_nonce)?;
self.broadcast_private_transaction(private.hash(), private.rlp_bytes());
if let Some(ref logging) = self.logging {
logging.private_tx_created(&tx_hash, &contract_validators);
}
Ok(Receipt {
hash: tx_hash,
contract_address: contract,
status_code: 0,
})
}
} }
Ok(Receipt {
hash: tx_hash,
contract_address: contract,
status_code: 0,
})
} }
/// Calculate hash from united private state and contract nonce /// Calculate hash from united private state and contract nonce
@ -312,55 +361,52 @@ impl Provider {
) )
} }
/// Retrieve and verify the first available private transaction for every sender fn process_verification_transaction(&self, transaction: &VerifiedPrivateTransaction) -> Result<(), Error> {
fn process_verification_queue(&self) -> Result<(), Error> { let private_hash = transaction.private_transaction.hash();
let process_transaction = |transaction: &VerifiedPrivateTransaction| -> Result<_, String> { match transaction.validator_account {
let private_hash = transaction.private_transaction.hash(); None => {
match transaction.validator_account { trace!(target: "privatetx", "Propagating transaction further");
None => { self.broadcast_private_transaction(private_hash, transaction.private_transaction.rlp_bytes());
return Ok(());
}
Some(validator_account) => {
if !self.validator_accounts.contains(&validator_account) {
trace!(target: "privatetx", "Propagating transaction further"); trace!(target: "privatetx", "Propagating transaction further");
self.broadcast_private_transaction(private_hash, transaction.private_transaction.rlp_bytes()); self.broadcast_private_transaction(private_hash, transaction.private_transaction.rlp_bytes());
return Ok(()); return Ok(());
} }
Some(validator_account) => { let contract = Self::contract_address_from_transaction(&transaction.transaction)?;
if !self.validator_accounts.contains(&validator_account) { // TODO #9825 [ToDr] Usage of BlockId::Latest
trace!(target: "privatetx", "Propagating transaction further"); let contract_nonce = self.get_contract_nonce(&contract, BlockId::Latest)?;
self.broadcast_private_transaction(private_hash, transaction.private_transaction.rlp_bytes()); let private_state = self.execute_private_transaction(BlockId::Latest, &transaction.transaction)?;
return Ok(()); let private_state_hash = self.calculate_state_hash(&private_state, contract_nonce);
} trace!(target: "privatetx", "Hashed effective private state for validator: {:?}", private_state_hash);
let contract = Self::contract_address_from_transaction(&transaction.transaction) let signed_state = self.accounts.sign(validator_account, private_state_hash)?;
.map_err(|_| "Incorrect type of action for the transaction")?; let signed_private_transaction = SignedPrivateTransaction::new(private_hash, signed_state, None);
// TODO #9825 [ToDr] Usage of BlockId::Latest trace!(target: "privatetx", "Sending signature for private transaction: {:?}", signed_private_transaction);
let contract_nonce = self.get_contract_nonce(&contract, BlockId::Latest); self.broadcast_signed_private_transaction(signed_private_transaction.hash(), signed_private_transaction.rlp_bytes());
if let Err(e) = contract_nonce {
return Err(format!("Cannot retrieve contract nonce: {:?}", e).into());
}
let contract_nonce = contract_nonce.expect("Error was checked before");
let private_state = self.execute_private_transaction(BlockId::Latest, &transaction.transaction);
if let Err(e) = private_state {
return Err(format!("Cannot retrieve private state: {:?}", e).into());
}
let private_state = private_state.expect("Error was checked before");
let private_state_hash = self.calculate_state_hash(&private_state, contract_nonce);
trace!(target: "privatetx", "Hashed effective private state for validator: {:?}", private_state_hash);
let signed_state = self.accounts.sign(validator_account, private_state_hash);
if let Err(e) = signed_state {
return Err(format!("Cannot sign the state: {:?}", e).into());
}
let signed_state = signed_state.expect("Error was checked before");
let signed_private_transaction = SignedPrivateTransaction::new(private_hash, signed_state, None);
trace!(target: "privatetx", "Sending signature for private transaction: {:?}", signed_private_transaction);
self.broadcast_signed_private_transaction(signed_private_transaction.hash(), signed_private_transaction.rlp_bytes());
}
} }
Ok(()) }
}; Ok(())
}
/// Retrieve and verify the first available private transaction for every sender
fn process_verification_queue(&self) -> Result<(), Error> {
let nonce_cache = NonceCache::new(NONCE_CACHE_SIZE); let nonce_cache = NonceCache::new(NONCE_CACHE_SIZE);
let local_accounts = HashSet::new(); let local_accounts = HashSet::new();
let ready_transactions = self.transactions_for_verification.drain(self.pool_client(&nonce_cache, &local_accounts)); let ready_transactions = self.transactions_for_verification.drain(self.pool_client(&nonce_cache, &local_accounts));
for transaction in ready_transactions { for transaction in ready_transactions {
if let Err(e) = process_transaction(&transaction) { if let Err(err) = self.process_verification_transaction(&transaction) {
warn!(target: "privatetx", "Error: {:?}", e); warn!(target: "privatetx", "Error: {:?}", err);
match err {
Error::PrivateStateNotFound => {
let contract = transaction.private_transaction.contract();
trace!(target: "privatetx", "Private state for the contract {:?} not found, requesting from peers", &contract);
let request = RequestType::Verification(transaction);
self.request_private_state(&contract, request)?;
}
_ => {}
}
} }
} }
Ok(()) Ok(())
@ -383,6 +429,7 @@ impl Provider {
let original_tx_hash = desc.original_transaction.hash(); let original_tx_hash = desc.original_transaction.hash();
if last.0 { if last.0 {
let contract = Self::contract_address_from_transaction(&desc.original_transaction)?;
let mut signatures = desc.received_signatures.clone(); let mut signatures = desc.received_signatures.clone();
signatures.push(signed_tx.signature()); signatures.push(signed_tx.signature());
let rsv: Vec<Signature> = signatures.into_iter().map(|sign| sign.into_electrum().into()).collect(); let rsv: Vec<Signature> = signatures.into_iter().map(|sign| sign.into_electrum().into()).collect();
@ -411,7 +458,6 @@ impl Provider {
} }
} }
// Notify about state changes // Notify about state changes
let contract = Self::contract_address_from_transaction(&desc.original_transaction)?;
// TODO #9825 Usage of BlockId::Latest // TODO #9825 Usage of BlockId::Latest
if self.get_contract_version(BlockId::Latest, &contract) >= PRIVATE_CONTRACT_WITH_NOTIFICATION_VER { if self.get_contract_version(BlockId::Latest, &contract) >= PRIVATE_CONTRACT_WITH_NOTIFICATION_VER {
match self.state_changes_notify(BlockId::Latest, &contract, &desc.original_transaction.sender(), desc.original_transaction.hash()) { match self.state_changes_notify(BlockId::Latest, &contract, &desc.original_transaction.sender(), desc.original_transaction.hash()) {
@ -489,6 +535,75 @@ impl Provider {
self.notify(|notify| notify.broadcast(ChainMessageType::SignedPrivateTransaction(transaction_hash, message.clone()))); self.notify(|notify| notify.broadcast(ChainMessageType::SignedPrivateTransaction(transaction_hash, message.clone())));
} }
fn request_private_state(&self, address: &Address, request_type: RequestType) -> Result<(), Error> {
// Define the list of available contracts
let mut private_contracts = Vec::new();
private_contracts.push(*address);
if let Some(key_server_account) = self.keys_provider.key_server_account() {
if let Some(available_contracts) = self.keys_provider.available_keys(BlockId::Latest, &key_server_account) {
for private_contract in available_contracts {
if private_contract == *address {
continue;
}
private_contracts.push(private_contract);
}
}
}
// Check states for the avaialble contracts, if they're outdated
let mut stalled_contracts_hashes: HashSet<H256> = HashSet::new();
for address in private_contracts {
if let Ok(state_hash) = self.get_decrypted_state_from_contract(&address, BlockId::Latest) {
if state_hash.len() != H256::len_bytes() {
return Err(Error::StateIncorrect);
}
let state_hash = H256::from_slice(&state_hash);
if let Err(_) = self.state_storage.private_state_db().state(&state_hash) {
// State not found in the local db
stalled_contracts_hashes.insert(state_hash);
}
}
}
let hashes_to_sync = self.state_storage.add_request(request_type, stalled_contracts_hashes);
if !hashes_to_sync.is_empty() {
trace!(target: "privatetx", "Requesting states for the following hashes: {:?}", hashes_to_sync);
for hash in hashes_to_sync {
self.notify(|notify| notify.broadcast(ChainMessageType::PrivateStateRequest(hash)));
}
}
Ok(())
}
fn private_state_sync_completed(&self, hash: &H256) -> Result<(), Error> {
self.state_storage.state_sync_completed(hash);
if self.state_storage.requests_ready() {
trace!(target: "privatetx", "Private state sync completed, processing pending requests");
let ready_requests = self.state_storage.drain_ready_requests();
for request in ready_requests {
match request {
RequestType::Creation(transaction) => {
match self.create_private_transaction(transaction) {
Ok(receipt) => trace!(target: "privatetx", "Creation request processed, receipt: {:?}", receipt),
Err(e) => error!(target: "privatetx", "Cannot process creation request with error: {:?}", e),
}
}
RequestType::Verification(transaction) => {
if let Err(err) = self.process_verification_transaction(&transaction) {
warn!(target: "privatetx", "Error while processing pending verification request: {:?}", err);
match err {
Error::PrivateStateNotFound => {
let contract = transaction.private_transaction.contract();
error!(target: "privatetx", "Cannot retrieve private state after sync for {:?}", &contract);
}
_ => {}
}
}
}
}
}
}
Ok(())
}
fn iv_from_transaction(transaction: &SignedTransaction) -> H128 { fn iv_from_transaction(transaction: &SignedTransaction) -> H128 {
let nonce = keccak(&transaction.nonce.rlp_bytes()); let nonce = keccak(&transaction.nonce.rlp_bytes());
let (iv, _) = nonce.as_bytes().split_at(INIT_VEC_LEN); let (iv, _) = nonce.as_bytes().split_at(INIT_VEC_LEN);
@ -512,10 +627,28 @@ impl Provider {
} }
fn get_decrypted_state(&self, address: &Address, block: BlockId) -> Result<Bytes, Error> { fn get_decrypted_state(&self, address: &Address, block: BlockId) -> Result<Bytes, Error> {
match self.use_offchain_storage {
true => {
let hashed_state = self.get_decrypted_state_from_contract(address, block)?;
if hashed_state.len() != H256::len_bytes() {
return Err(Error::StateIncorrect);
}
let hashed_state = H256::from_slice(&hashed_state);
let stored_state_data = self.state_storage.private_state_db().state(&hashed_state)?;
self.decrypt(address, &stored_state_data)
}
false => self.get_decrypted_state_from_contract(address, block),
}
}
fn get_decrypted_state_from_contract(&self, address: &Address, block: BlockId) -> Result<Bytes, Error> {
let (data, decoder) = private_contract::functions::state::call(); let (data, decoder) = private_contract::functions::state::call();
let value = self.client.call_contract(block, *address, data)?; let value = self.client.call_contract(block, *address, data)?;
let state = decoder.decode(&value).map_err(|e| Error::Call(format!("Contract call failed {:?}", e)))?; let state = decoder.decode(&value).map_err(|e| Error::Call(format!("Contract call failed {:?}", e)))?;
self.decrypt(address, &state) match self.use_offchain_storage {
true => Ok(state),
false => self.decrypt(address, &state),
}
} }
fn get_decrypted_code(&self, address: &Address, block: BlockId) -> Result<Bytes, Error> { fn get_decrypted_code(&self, address: &Address, block: BlockId) -> Result<Bytes, Error> {
@ -610,9 +743,14 @@ impl Provider {
}; };
(enc_code, self.encrypt(&contract_address, &Self::iv_from_transaction(transaction), &Self::snapshot_from_storage(&storage))?) (enc_code, self.encrypt(&contract_address, &Self::iv_from_transaction(transaction), &Self::snapshot_from_storage(&storage))?)
}; };
let mut saved_state = encrypted_storage;
if self.use_offchain_storage {
// Save state into the storage and return its hash
saved_state = self.state_storage.private_state_db().save_state(&saved_state)?.0.to_vec();
}
Ok(PrivateExecutionResult { Ok(PrivateExecutionResult {
code: encrypted_code, code: encrypted_code,
state: encrypted_storage, state: saved_state,
contract_address: contract_address, contract_address: contract_address,
result, result,
}) })
@ -739,6 +877,21 @@ impl Provider {
} }
} }
impl IoHandler<ClientIoMessage> for Provider {
fn initialize(&self, io: &IoContext<ClientIoMessage>) {
if self.use_offchain_storage {
io.register_timer(STATE_RETRIEVAL_TIMER, STATE_RETRIEVAL_TICK).expect("Error registering state retrieval timer");
}
}
fn timeout(&self, _io: &IoContext<ClientIoMessage>, timer: TimerToken) {
match timer {
STATE_RETRIEVAL_TIMER => self.state_storage.tick(&self.logging),
_ => warn!("IO service triggered unregistered timer '{}'", timer),
}
}
}
pub trait Importer { pub trait Importer {
/// Process received private transaction /// Process received private transaction
fn import_private_transaction(&self, _rlp: &[u8]) -> Result<H256, Error>; fn import_private_transaction(&self, _rlp: &[u8]) -> Result<H256, Error>;
@ -747,6 +900,9 @@ pub trait Importer {
/// ///
/// Creates corresponding public transaction if last required signature collected and sends it to the chain /// Creates corresponding public transaction if last required signature collected and sends it to the chain
fn import_signed_private_transaction(&self, _rlp: &[u8]) -> Result<H256, Error>; fn import_signed_private_transaction(&self, _rlp: &[u8]) -> Result<H256, Error>;
/// Function called when requested private state retrieved from peer and saved to DB.
fn private_state_synced(&self, hash: &H256) -> Result<(), String>;
} }
// TODO [ToDr] Offload more heavy stuff to the IoService thread. // TODO [ToDr] Offload more heavy stuff to the IoService thread.
@ -810,6 +966,23 @@ impl Importer for Arc<Provider> {
} }
Ok(private_hash) Ok(private_hash)
} }
fn private_state_synced(&self, hash: &H256) -> Result<(), String> {
trace!(target: "privatetx", "Private state synced, hash: {:?}", hash);
let provider = Arc::downgrade(self);
let completed_hash = *hash;
let result = self.channel.send(ClientIoMessage::execute(move |_| {
if let Some(provider) = provider.upgrade() {
if let Err(e) = provider.private_state_sync_completed(&completed_hash) {
warn!(target: "privatetx", "Unable to process the state synced signal: {}", e);
}
}
}));
if let Err(e) = result {
warn!(target: "privatetx", "Error sending private state synced message: {:?}", e);
}
Ok(())
}
} }
impl ChainNotify for Provider { impl ChainNotify for Provider {

View File

@ -70,6 +70,10 @@ impl Default for MonoTime {
pub enum PrivateTxStatus { pub enum PrivateTxStatus {
/// Private tx was created but no validation received yet /// Private tx was created but no validation received yet
Created, Created,
/// Private state not found locally and being retrived from peers
PrivateStateSync,
/// Retrieval of the private state failed, transaction not created
PrivateStateSyncFailed,
/// Several validators (but not all) validated the transaction /// Several validators (but not all) validated the transaction
Validating, Validating,
/// All validators has validated the private tx /// All validators has validated the private tx
@ -209,6 +213,17 @@ impl Logging {
/// Logs the creation of the private transaction /// Logs the creation of the private transaction
pub fn private_tx_created(&self, tx_hash: &H256, validators: &[Address]) { pub fn private_tx_created(&self, tx_hash: &H256, validators: &[Address]) {
let mut logs = self.logs.write();
if let Some(transaction_log) = logs.get_mut(&tx_hash) {
if transaction_log.status == PrivateTxStatus::PrivateStateSync {
// Transaction was already created before, its private state was being retrieved
transaction_log.status = PrivateTxStatus::Created;
return;
} else {
error!(target: "privatetx", "Attempt to create a duplicate transaction");
return;
}
}
let mut validator_logs = Vec::new(); let mut validator_logs = Vec::new();
for account in validators { for account in validators {
validator_logs.push(ValidatorLog { validator_logs.push(ValidatorLog {
@ -216,7 +231,6 @@ impl Logging {
validation_timestamp: None, validation_timestamp: None,
}); });
} }
let mut logs = self.logs.write();
if logs.len() > MAX_JOURNAL_LEN { if logs.len() > MAX_JOURNAL_LEN {
// Remove the oldest log // Remove the oldest log
if let Some(tx_hash) = logs.values() if let Some(tx_hash) = logs.values()
@ -236,6 +250,22 @@ impl Logging {
}); });
} }
/// Private state retrieval started
pub fn private_state_request(&self, tx_hash: &H256) {
let mut logs = self.logs.write();
if let Some(transaction_log) = logs.get_mut(&tx_hash) {
transaction_log.status = PrivateTxStatus::PrivateStateSync;
}
}
/// Private state retrieval failed
pub fn private_state_sync_failed(&self, tx_hash: &H256) {
let mut logs = self.logs.write();
if let Some(transaction_log) = logs.get_mut(&tx_hash) {
transaction_log.status = PrivateTxStatus::PrivateStateSyncFailed;
}
}
/// Logs the validation of the private transaction by one of its validators /// Logs the validation of the private transaction by one of its validators
pub fn signature_added(&self, tx_hash: &H256, validator: &Address) { pub fn signature_added(&self, tx_hash: &H256, validator: &Address) {
let mut logs = self.logs.write(); let mut logs = self.logs.write();

View File

@ -0,0 +1,62 @@
// Copyright 2015-2019 Parity Technologies (UK) Ltd.
// This file is part of Parity Ethereum.
// Parity Ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
use std::sync::Arc;
use ethereum_types::H256;
use bytes::Bytes;
use kvdb::{KeyValueDB, DBTransaction};
use keccak_hasher::KeccakHasher;
use hash_db::Hasher;
use ethcore_db::COL_PRIVATE_TRANSACTIONS_STATE;
use error::Error;
/// Wrapper around local db with private state for sync purposes
pub struct PrivateStateDB {
db: Arc<KeyValueDB>,
}
impl PrivateStateDB {
/// Constructs the object
pub fn new(db: Arc<KeyValueDB>) -> Self {
PrivateStateDB {
db,
}
}
/// Returns saved state for the hash
pub fn state(&self, state_hash: &H256) -> Result<Bytes, Error> {
trace!(target: "privatetx", "Retrieve private state from db with hash: {:?}", state_hash);
self.db.get(COL_PRIVATE_TRANSACTIONS_STATE, state_hash.as_bytes())
.expect("Low-level database error. Some issue with your hard disk?")
.map(|s| s.to_vec())
.ok_or(Error::PrivateStateNotFound)
}
/// Stores state for the hash
pub fn save_state(&self, storage: &Bytes) -> Result<H256, Error> {
let state_hash = self.state_hash(storage)?;
let mut transaction = DBTransaction::new();
transaction.put(COL_PRIVATE_TRANSACTIONS_STATE, state_hash.as_bytes(), storage);
self.db.write(transaction).map_err(|_| Error::DatabaseWriteError)?;
trace!(target: "privatetx", "Private state saved to db, its hash: {:?}", state_hash);
Ok(state_hash)
}
/// Returns state's hash without committing it to DB
pub fn state_hash(&self, state: &Bytes) -> Result<H256, Error> {
Ok(KeccakHasher::hash(state))
}
}

View File

@ -0,0 +1,165 @@
// Copyright 2015-2019 Parity Technologies (UK) Ltd.
// This file is part of Parity Ethereum.
// Parity Ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
use std::collections::{HashSet, HashMap};
use std::sync::Arc;
use std::time::{Instant, Duration};
use parking_lot::RwLock;
use ethereum_types::H256;
use kvdb::KeyValueDB;
use types::transaction::SignedTransaction;
use private_transactions::VerifiedPrivateTransaction;
use private_state_db::PrivateStateDB;
use log::Logging;
/// Max duration of retrieving state (in ms)
const MAX_REQUEST_SESSION_DURATION: u64 = 120 * 1000;
/// Type of the stored reques
pub enum RequestType {
/// Verification of private transaction
Verification(Arc<VerifiedPrivateTransaction>),
/// Creation of the private transaction
Creation(SignedTransaction),
}
#[derive(Clone, PartialEq)]
enum RequestState {
Syncing,
Ready,
}
struct StateRequest {
request_type: RequestType,
request_hashes: HashSet<H256>,
state: RequestState,
}
/// Wrapper over storage for the private states
pub struct PrivateStateStorage {
private_state_db: Arc<PrivateStateDB>,
requests: RwLock<Vec<StateRequest>>,
syncing_hashes: RwLock<HashMap<H256, Instant>>,
}
impl PrivateStateStorage {
/// Constructs the object
pub fn new(db: Arc<KeyValueDB>) -> Self {
PrivateStateStorage {
private_state_db: Arc::new(PrivateStateDB::new(db)),
requests: RwLock::new(Vec::new()),
syncing_hashes: RwLock::default(),
}
}
/// Checks if ready for processing requests exist in queue
pub fn requests_ready(&self) -> bool {
let requests = self.requests.read();
requests.iter().find(|r| r.state == RequestState::Ready).is_some()
}
/// Signals that corresponding private state retrieved and added into the local db
pub fn state_sync_completed(&self, synced_state_hash: &H256) {
let mut syncing_hashes = self.syncing_hashes.write();
syncing_hashes.remove(synced_state_hash);
self.mark_hash_ready(synced_state_hash);
}
/// Returns underlying DB
pub fn private_state_db(&self) -> Arc<PrivateStateDB> {
self.private_state_db.clone()
}
/// Store a request for state's sync and later processing, returns new hashes, which sync is required
pub fn add_request(&self, request_type: RequestType, request_hashes: HashSet<H256>) -> Vec<H256> {
let request = StateRequest {
request_type: request_type,
request_hashes: request_hashes.clone(),
state: RequestState::Syncing,
};
let mut requests = self.requests.write();
requests.push(request);
let mut new_hashes = Vec::new();
for hash in request_hashes {
let mut hashes = self.syncing_hashes.write();
if hashes.insert(hash, Instant::now() + Duration::from_millis(MAX_REQUEST_SESSION_DURATION)).is_none() {
new_hashes.push(hash);
}
}
new_hashes
}
/// Drains ready requests to process
pub fn drain_ready_requests(&self) -> Vec<RequestType> {
let mut requests_queue = self.requests.write();
let mut drained = Vec::new();
let mut i = 0;
while i != requests_queue.len() {
if requests_queue[i].state == RequestState::Ready {
let request = requests_queue.remove(i);
drained.push(request.request_type);
} else {
i += 1;
}
}
drained
}
/// State retrieval timer's tick
pub fn tick(&self, logging: &Option<Logging>) {
let mut syncing_hashes = self.syncing_hashes.write();
let current_time = Instant::now();
syncing_hashes
.iter()
.filter(|&(_, expiration_time)| *expiration_time >= current_time)
.for_each(|(hash, _)| self.mark_hash_stale(&hash, logging));
syncing_hashes.retain(|_, expiration_time| *expiration_time < current_time);
}
fn mark_hash_ready(&self, ready_hash: &H256) {
let mut requests = self.requests.write();
for request in requests.iter_mut() {
request.request_hashes.remove(ready_hash);
if request.request_hashes.is_empty() && request.state == RequestState::Syncing {
request.state = RequestState::Ready;
}
}
}
fn mark_hash_stale(&self, stale_hash: &H256, logging: &Option<Logging>) {
let mut requests = self.requests.write();
requests.retain(|request| {
let mut delete_request = false;
if request.request_hashes.contains(stale_hash) {
let tx_hash;
match &request.request_type {
RequestType::Verification(transaction) => {
tx_hash = transaction.transaction_hash;
}
RequestType::Creation(transaction) => {
tx_hash = transaction.hash();
if let Some(ref logging) = logging {
logging.private_state_sync_failed(&tx_hash);
}
}
}
trace!(target: "privatetx", "Private state request for {:?} staled due to timeout", &tx_hash);
delete_request = true;
}
!delete_request
});
}
}

View File

@ -37,7 +37,7 @@ use types::ids::BlockId;
use types::transaction::{Transaction, Action}; use types::transaction::{Transaction, Action};
use ethcore::{ use ethcore::{
CreateContractAddress, CreateContractAddress,
test_helpers::{generate_dummy_client, push_block_with_transactions}, test_helpers::{generate_dummy_client, push_block_with_transactions, new_db},
miner::Miner, miner::Miner,
spec, spec,
}; };
@ -65,11 +65,13 @@ fn private_contract() {
validator_accounts: vec![key3.address(), key4.address()], validator_accounts: vec![key3.address(), key4.address()],
signer_account: None, signer_account: None,
logs_path: None, logs_path: None,
use_offchain_storage: false,
}; };
let io = ethcore_io::IoChannel::disconnected(); let io = ethcore_io::IoChannel::disconnected();
let miner = Arc::new(Miner::new_for_tests(&spec::new_test(), None)); let miner = Arc::new(Miner::new_for_tests(&spec::new_test(), None));
let private_keys = Arc::new(StoringKeyProvider::default()); let private_keys = Arc::new(StoringKeyProvider::default());
let db = new_db();
let pm = Arc::new(Provider::new( let pm = Arc::new(Provider::new(
client.clone(), client.clone(),
miner, miner,
@ -78,6 +80,7 @@ fn private_contract() {
config, config,
io, io,
private_keys, private_keys,
db.key_value().clone(),
)); ));
let (address, _) = contract_address(CreateContractAddress::FromSenderAndNonce, &key1.address(), &0.into(), &[]); let (address, _) = contract_address(CreateContractAddress::FromSenderAndNonce, &key1.address(), &0.into(), &[]);
@ -200,11 +203,13 @@ fn call_other_private_contract() {
validator_accounts: vec![key3.address(), key4.address()], validator_accounts: vec![key3.address(), key4.address()],
signer_account: None, signer_account: None,
logs_path: None, logs_path: None,
use_offchain_storage: false,
}; };
let io = ethcore_io::IoChannel::disconnected(); let io = ethcore_io::IoChannel::disconnected();
let miner = Arc::new(Miner::new_for_tests(&spec::new_test(), None)); let miner = Arc::new(Miner::new_for_tests(&spec::new_test(), None));
let private_keys = Arc::new(StoringKeyProvider::default()); let private_keys = Arc::new(StoringKeyProvider::default());
let db = new_db();
let pm = Arc::new(Provider::new( let pm = Arc::new(Provider::new(
client.clone(), client.clone(),
miner, miner,
@ -213,6 +218,7 @@ fn call_other_private_contract() {
config, config,
io, io,
private_keys.clone(), private_keys.clone(),
db.key_value().clone(),
)); ));
// Deploy contract A // Deploy contract A

View File

@ -73,6 +73,16 @@ impl PrivateTxHandler for PrivateTxService {
} }
} }
} }
fn private_state_synced(&self, hash: &H256) -> Result<(), String> {
match self.provider.private_state_synced(hash) {
Ok(handle_result) => Ok(handle_result),
Err(err) => {
warn!(target: "privatetx", "Unable to handle private state synced message: {}", err);
return Err(err.to_string())
}
}
}
} }
/// Client service setup. Creates and registers client and network services with the IO subsystem. /// Client service setup. Creates and registers client and network services with the IO subsystem.
@ -139,8 +149,10 @@ impl ClientService {
private_tx_conf, private_tx_conf,
io_service.channel(), io_service.channel(),
private_keys, private_keys,
blockchain_db.key_value().clone(),
)); ));
let private_tx = Arc::new(PrivateTxService::new(provider)); let private_tx = Arc::new(PrivateTxService::new(provider.clone()));
io_service.register_handler(provider)?;
let client_io = Arc::new(ClientIoHandler { let client_io = Arc::new(ClientIoHandler {
client: client.clone(), client: client.clone(),

View File

@ -29,6 +29,8 @@ pub enum ChainMessageType {
PrivateTransaction(H256, Vec<u8>), PrivateTransaction(H256, Vec<u8>),
/// Message with signed private transaction /// Message with signed private transaction
SignedPrivateTransaction(H256, Vec<u8>), SignedPrivateTransaction(H256, Vec<u8>),
/// Private state request for the particular private contract
PrivateStateRequest(H256),
} }
/// Route type to indicate whether it is enacted or retracted. /// Route type to indicate whether it is enacted or retracted.

View File

@ -514,6 +514,7 @@ impl ChainNotify for TestNotify {
ChainMessageType::Consensus(data) => data, ChainMessageType::Consensus(data) => data,
ChainMessageType::SignedPrivateTransaction(_, data) => data, ChainMessageType::SignedPrivateTransaction(_, data) => data,
ChainMessageType::PrivateTransaction(_, data) => data, ChainMessageType::PrivateTransaction(_, data) => data,
ChainMessageType::PrivateStateRequest(_) => Vec::new(),
}; };
self.messages.write().push(data); self.messages.write().push(data);
} }

View File

@ -18,6 +18,7 @@ ethcore-light = { path = "../light" }
ethcore-network = { path = "../../util/network" } ethcore-network = { path = "../../util/network" }
ethcore-network-devp2p = { path = "../../util/network-devp2p" } ethcore-network-devp2p = { path = "../../util/network-devp2p" }
ethereum-types = "0.6.0" ethereum-types = "0.6.0"
ethcore-private-tx = { path = "../private-tx" }
ethkey = { path = "../../accounts/ethkey" } ethkey = { path = "../../accounts/ethkey" }
ethstore = { path = "../../accounts/ethstore" } ethstore = { path = "../../accounts/ethstore" }
fastmap = { path = "../../util/fastmap" } fastmap = { path = "../../util/fastmap" }
@ -42,7 +43,6 @@ parity-runtime = { path = "../../util/runtime" }
env_logger = "0.5" env_logger = "0.5"
ethcore = { path = "..", features = ["test-helpers"] } ethcore = { path = "..", features = ["test-helpers"] }
ethcore-io = { path = "../../util/io", features = ["mio"] } ethcore-io = { path = "../../util/io", features = ["mio"] }
ethcore-private-tx = { path = "../private-tx" }
kvdb-memorydb = "0.1" kvdb-memorydb = "0.1"
rustc-hex = "1.0" rustc-hex = "1.0"
rand_xorshift = "0.1.1" rand_xorshift = "0.1.1"

View File

@ -35,6 +35,7 @@ use ethkey::Secret;
use ethcore::client::{ChainNotify, NewBlocks, ChainMessageType}; use ethcore::client::{ChainNotify, NewBlocks, ChainMessageType};
use client_traits::BlockChainClient; use client_traits::BlockChainClient;
use ethcore::snapshot::SnapshotService; use ethcore::snapshot::SnapshotService;
use ethcore_private_tx::PrivateStateDB;
use types::BlockNumber; use types::BlockNumber;
use sync_io::NetSyncIo; use sync_io::NetSyncIo;
use chain::{ChainSyncApi, SyncStatus as EthSyncStatus}; use chain::{ChainSyncApi, SyncStatus as EthSyncStatus};
@ -42,7 +43,7 @@ use std::net::{SocketAddr, AddrParseError};
use std::str::FromStr; use std::str::FromStr;
use parking_lot::{RwLock, Mutex}; use parking_lot::{RwLock, Mutex};
use chain::{ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_62, use chain::{ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_62,
PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3, SyncState}; PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3, PAR_PROTOCOL_VERSION_4, SyncState};
use chain::sync_packet::SyncPacket::{PrivateTransactionPacket, SignedPrivateTransactionPacket}; use chain::sync_packet::SyncPacket::{PrivateTransactionPacket, SignedPrivateTransactionPacket};
use light::client::AsLightClient; use light::client::AsLightClient;
use light::Provider; use light::Provider;
@ -263,6 +264,8 @@ pub struct Params {
pub snapshot_service: Arc<dyn SnapshotService>, pub snapshot_service: Arc<dyn SnapshotService>,
/// Private tx service. /// Private tx service.
pub private_tx_handler: Option<Arc<dyn PrivateTxHandler>>, pub private_tx_handler: Option<Arc<dyn PrivateTxHandler>>,
/// Private state wrapper
pub private_state: Option<Arc<PrivateStateDB>>,
/// Light data provider. /// Light data provider.
pub provider: Arc<dyn (::light::Provider)>, pub provider: Arc<dyn (::light::Provider)>,
/// Network layer configuration. /// Network layer configuration.
@ -377,6 +380,7 @@ impl EthSync {
chain: params.chain, chain: params.chain,
snapshot_service: params.snapshot_service, snapshot_service: params.snapshot_service,
overlay: RwLock::new(HashMap::new()), overlay: RwLock::new(HashMap::new()),
private_state: params.private_state,
}), }),
light_proto: light_proto, light_proto: light_proto,
subprotocol_name: params.config.subprotocol_name, subprotocol_name: params.config.subprotocol_name,
@ -460,6 +464,8 @@ struct SyncProtocolHandler {
sync: ChainSyncApi, sync: ChainSyncApi,
/// Chain overlay used to cache data such as fork block. /// Chain overlay used to cache data such as fork block.
overlay: RwLock<HashMap<BlockNumber, Bytes>>, overlay: RwLock<HashMap<BlockNumber, Bytes>>,
/// Private state db
private_state: Option<Arc<PrivateStateDB>>,
} }
impl NetworkProtocolHandler for SyncProtocolHandler { impl NetworkProtocolHandler for SyncProtocolHandler {
@ -475,7 +481,12 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
} }
fn read(&self, io: &dyn NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { fn read(&self, io: &dyn NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
self.sync.dispatch_packet(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer, packet_id, data); self.sync.dispatch_packet(&mut NetSyncIo::new(io,
&*self.chain,
&*self.snapshot_service,
&self.overlay,
self.private_state.clone()),
*peer, packet_id, data);
} }
fn connected(&self, io: &dyn NetworkContext, peer: &PeerId) { fn connected(&self, io: &dyn NetworkContext, peer: &PeerId) {
@ -484,20 +495,30 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
let warp_protocol = io.protocol_version(WARP_SYNC_PROTOCOL_ID, *peer).unwrap_or(0) != 0; let warp_protocol = io.protocol_version(WARP_SYNC_PROTOCOL_ID, *peer).unwrap_or(0) != 0;
let warp_context = io.subprotocol_name() == WARP_SYNC_PROTOCOL_ID; let warp_context = io.subprotocol_name() == WARP_SYNC_PROTOCOL_ID;
if warp_protocol == warp_context { if warp_protocol == warp_context {
self.sync.write().on_peer_connected(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer); self.sync.write().on_peer_connected(&mut NetSyncIo::new(io,
&*self.chain,
&*self.snapshot_service,
&self.overlay,
self.private_state.clone()),
*peer);
} }
} }
fn disconnected(&self, io: &dyn NetworkContext, peer: &PeerId) { fn disconnected(&self, io: &dyn NetworkContext, peer: &PeerId) {
trace_time!("sync::disconnected"); trace_time!("sync::disconnected");
if io.subprotocol_name() != WARP_SYNC_PROTOCOL_ID { if io.subprotocol_name() != WARP_SYNC_PROTOCOL_ID {
self.sync.write().on_peer_aborting(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer); self.sync.write().on_peer_aborting(&mut NetSyncIo::new(io,
&*self.chain,
&*self.snapshot_service,
&self.overlay,
self.private_state.clone()),
*peer);
} }
} }
fn timeout(&self, io: &dyn NetworkContext, timer: TimerToken) { fn timeout(&self, io: &dyn NetworkContext, timer: TimerToken) {
trace_time!("sync::timeout"); trace_time!("sync::timeout");
let mut io = NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay); let mut io = NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay, self.private_state.clone());
match timer { match timer {
PEERS_TIMER => self.sync.write().maintain_peers(&mut io), PEERS_TIMER => self.sync.write().maintain_peers(&mut io),
MAINTAIN_SYNC_TIMER => self.sync.write().maintain_sync(&mut io), MAINTAIN_SYNC_TIMER => self.sync.write().maintain_sync(&mut io),
@ -528,8 +549,11 @@ impl ChainNotify for EthSync {
use light::net::Announcement; use light::net::Announcement;
self.network.with_context(self.subprotocol_name, |context| { self.network.with_context(self.subprotocol_name, |context| {
let mut sync_io = NetSyncIo::new(context, &*self.eth_handler.chain, &*self.eth_handler.snapshot_service, let mut sync_io = NetSyncIo::new(context,
&self.eth_handler.overlay); &*self.eth_handler.chain,
&*self.eth_handler.snapshot_service,
&self.eth_handler.overlay,
self.eth_handler.private_state.clone());
self.eth_handler.sync.write().chain_new_blocks( self.eth_handler.sync.write().chain_new_blocks(
&mut sync_io, &mut sync_io,
&new_blocks.imported, &new_blocks.imported,
@ -576,7 +600,7 @@ impl ChainNotify for EthSync {
self.network.register_protocol(self.eth_handler.clone(), self.subprotocol_name, &[ETH_PROTOCOL_VERSION_62, ETH_PROTOCOL_VERSION_63]) self.network.register_protocol(self.eth_handler.clone(), self.subprotocol_name, &[ETH_PROTOCOL_VERSION_62, ETH_PROTOCOL_VERSION_63])
.unwrap_or_else(|e| warn!("Error registering ethereum protocol: {:?}", e)); .unwrap_or_else(|e| warn!("Error registering ethereum protocol: {:?}", e));
// register the warp sync subprotocol // register the warp sync subprotocol
self.network.register_protocol(self.eth_handler.clone(), WARP_SYNC_PROTOCOL_ID, &[PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3]) self.network.register_protocol(self.eth_handler.clone(), WARP_SYNC_PROTOCOL_ID, &[PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3, PAR_PROTOCOL_VERSION_4])
.unwrap_or_else(|e| warn!("Error registering snapshot sync protocol: {:?}", e)); .unwrap_or_else(|e| warn!("Error registering snapshot sync protocol: {:?}", e));
// register the light protocol. // register the light protocol.
@ -593,13 +617,19 @@ impl ChainNotify for EthSync {
fn broadcast(&self, message_type: ChainMessageType) { fn broadcast(&self, message_type: ChainMessageType) {
self.network.with_context(WARP_SYNC_PROTOCOL_ID, |context| { self.network.with_context(WARP_SYNC_PROTOCOL_ID, |context| {
let mut sync_io = NetSyncIo::new(context, &*self.eth_handler.chain, &*self.eth_handler.snapshot_service, &self.eth_handler.overlay); let mut sync_io = NetSyncIo::new(context,
&*self.eth_handler.chain,
&*self.eth_handler.snapshot_service,
&self.eth_handler.overlay,
self.eth_handler.private_state.clone());
match message_type { match message_type {
ChainMessageType::Consensus(message) => self.eth_handler.sync.write().propagate_consensus_packet(&mut sync_io, message), ChainMessageType::Consensus(message) => self.eth_handler.sync.write().propagate_consensus_packet(&mut sync_io, message),
ChainMessageType::PrivateTransaction(transaction_hash, message) => ChainMessageType::PrivateTransaction(transaction_hash, message) =>
self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, PrivateTransactionPacket, message), self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, PrivateTransactionPacket, message),
ChainMessageType::SignedPrivateTransaction(transaction_hash, message) => ChainMessageType::SignedPrivateTransaction(transaction_hash, message) =>
self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, SignedPrivateTransactionPacket, message), self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, SignedPrivateTransactionPacket, message),
ChainMessageType::PrivateStateRequest(hash) =>
self.eth_handler.sync.write().request_private_state(&mut sync_io, &hash),
} }
}); });
} }
@ -664,7 +694,11 @@ impl ManageNetwork for EthSync {
fn stop_network(&self) { fn stop_network(&self) {
self.network.with_context(self.subprotocol_name, |context| { self.network.with_context(self.subprotocol_name, |context| {
let mut sync_io = NetSyncIo::new(context, &*self.eth_handler.chain, &*self.eth_handler.snapshot_service, &self.eth_handler.overlay); let mut sync_io = NetSyncIo::new(context,
&*self.eth_handler.chain,
&*self.eth_handler.snapshot_service,
&self.eth_handler.overlay,
self.eth_handler.private_state.clone());
self.eth_handler.sync.write().abort(&mut sync_io); self.eth_handler.sync.write().abort(&mut sync_io);
}); });

View File

@ -691,7 +691,7 @@ mod tests {
let mut chain = TestBlockChainClient::new(); let mut chain = TestBlockChainClient::new();
let snapshot_service = TestSnapshotService::new(); let snapshot_service = TestSnapshotService::new();
let queue = RwLock::new(VecDeque::new()); let queue = RwLock::new(VecDeque::new());
let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None); let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None, None);
// Valid headers sequence. // Valid headers sequence.
let valid_headers = [ let valid_headers = [
@ -757,7 +757,7 @@ mod tests {
let mut chain = TestBlockChainClient::new(); let mut chain = TestBlockChainClient::new();
let snapshot_service = TestSnapshotService::new(); let snapshot_service = TestSnapshotService::new();
let queue = RwLock::new(VecDeque::new()); let queue = RwLock::new(VecDeque::new());
let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None); let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None, None);
let mut headers = Vec::with_capacity(3); let mut headers = Vec::with_capacity(3);
let parent_hash = H256::random(); let parent_hash = H256::random();
@ -807,7 +807,7 @@ mod tests {
let mut chain = TestBlockChainClient::new(); let mut chain = TestBlockChainClient::new();
let snapshot_service = TestSnapshotService::new(); let snapshot_service = TestSnapshotService::new();
let queue = RwLock::new(VecDeque::new()); let queue = RwLock::new(VecDeque::new());
let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None); let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None, None);
// Import block headers. // Import block headers.
let mut headers = Vec::with_capacity(4); let mut headers = Vec::with_capacity(4);
@ -875,7 +875,7 @@ mod tests {
let mut chain = TestBlockChainClient::new(); let mut chain = TestBlockChainClient::new();
let snapshot_service = TestSnapshotService::new(); let snapshot_service = TestSnapshotService::new();
let queue = RwLock::new(VecDeque::new()); let queue = RwLock::new(VecDeque::new());
let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None); let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None, None);
// Import block headers. // Import block headers.
let mut headers = Vec::with_capacity(4); let mut headers = Vec::with_capacity(4);
@ -940,7 +940,7 @@ mod tests {
let mut chain = TestBlockChainClient::new(); let mut chain = TestBlockChainClient::new();
let snapshot_service = TestSnapshotService::new(); let snapshot_service = TestSnapshotService::new();
let queue = RwLock::new(VecDeque::new()); let queue = RwLock::new(VecDeque::new());
let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None); let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None, None);
let heads = [ let heads = [
spec.genesis_header(), spec.genesis_header(),
@ -980,7 +980,7 @@ mod tests {
let mut chain = TestBlockChainClient::new(); let mut chain = TestBlockChainClient::new();
let snapshot_service = TestSnapshotService::new(); let snapshot_service = TestSnapshotService::new();
let queue = RwLock::new(VecDeque::new()); let queue = RwLock::new(VecDeque::new());
let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None); let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None, None);
let heads = [ let heads = [
spec.genesis_header() spec.genesis_header()

View File

@ -48,6 +48,7 @@ use super::sync_packet::SyncPacket::{
SnapshotDataPacket, SnapshotDataPacket,
PrivateTransactionPacket, PrivateTransactionPacket,
SignedPrivateTransactionPacket, SignedPrivateTransactionPacket,
PrivateStatePacket,
}; };
use super::{ use super::{
@ -65,6 +66,7 @@ use super::{
MAX_NEW_HASHES, MAX_NEW_HASHES,
PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_1,
PAR_PROTOCOL_VERSION_3, PAR_PROTOCOL_VERSION_3,
PAR_PROTOCOL_VERSION_4,
}; };
/// The Chain Sync Handler: handles responses from peers /// The Chain Sync Handler: handles responses from peers
@ -86,6 +88,7 @@ impl SyncHandler {
SnapshotDataPacket => SyncHandler::on_snapshot_data(sync, io, peer, &rlp), SnapshotDataPacket => SyncHandler::on_snapshot_data(sync, io, peer, &rlp),
PrivateTransactionPacket => SyncHandler::on_private_transaction(sync, io, peer, &rlp), PrivateTransactionPacket => SyncHandler::on_private_transaction(sync, io, peer, &rlp),
SignedPrivateTransactionPacket => SyncHandler::on_signed_private_transaction(sync, io, peer, &rlp), SignedPrivateTransactionPacket => SyncHandler::on_signed_private_transaction(sync, io, peer, &rlp),
PrivateStatePacket => SyncHandler::on_private_state_data(sync, io, peer, &rlp),
_ => { _ => {
debug!(target: "sync", "{}: Unknown packet {}", peer, packet_id.id()); debug!(target: "sync", "{}: Unknown packet {}", peer, packet_id.id());
Ok(()) Ok(())
@ -585,6 +588,7 @@ impl SyncHandler {
asking: PeerAsking::Nothing, asking: PeerAsking::Nothing,
asking_blocks: Vec::new(), asking_blocks: Vec::new(),
asking_hash: None, asking_hash: None,
asking_private_state: None,
ask_time: Instant::now(), ask_time: Instant::now(),
last_sent_transactions: Default::default(), last_sent_transactions: Default::default(),
last_sent_private_transactions: Default::default(), last_sent_private_transactions: Default::default(),
@ -635,7 +639,7 @@ impl SyncHandler {
} }
if false if false
|| (warp_protocol && (peer.protocol_version < PAR_PROTOCOL_VERSION_1.0 || peer.protocol_version > PAR_PROTOCOL_VERSION_3.0)) || (warp_protocol && (peer.protocol_version < PAR_PROTOCOL_VERSION_1.0 || peer.protocol_version > PAR_PROTOCOL_VERSION_4.0))
|| (!warp_protocol && (peer.protocol_version < ETH_PROTOCOL_VERSION_62.0 || peer.protocol_version > ETH_PROTOCOL_VERSION_63.0)) || (!warp_protocol && (peer.protocol_version < ETH_PROTOCOL_VERSION_62.0 || peer.protocol_version > ETH_PROTOCOL_VERSION_63.0))
{ {
trace!(target: "sync", "Peer {} unsupported eth protocol ({})", peer_id, peer.protocol_version); trace!(target: "sync", "Peer {} unsupported eth protocol ({})", peer_id, peer.protocol_version);
@ -696,7 +700,7 @@ impl SyncHandler {
return Ok(()); return Ok(());
} }
}; };
trace!(target: "sync", "Received signed private transaction packet from {:?}", peer_id); trace!(target: "privatetx", "Received signed private transaction packet from {:?}", peer_id);
match private_handler.import_signed_private_transaction(r.as_raw()) { match private_handler.import_signed_private_transaction(r.as_raw()) {
Ok(transaction_hash) => { Ok(transaction_hash) => {
//don't send the packet back //don't send the packet back
@ -705,7 +709,7 @@ impl SyncHandler {
} }
}, },
Err(e) => { Err(e) => {
trace!(target: "sync", "Ignoring the message, error queueing: {}", e); trace!(target: "privatetx", "Ignoring the message, error queueing: {}", e);
} }
} }
Ok(()) Ok(())
@ -724,7 +728,7 @@ impl SyncHandler {
return Ok(()); return Ok(());
} }
}; };
trace!(target: "sync", "Received private transaction packet from {:?}", peer_id); trace!(target: "privatetx", "Received private transaction packet from {:?}", peer_id);
match private_handler.import_private_transaction(r.as_raw()) { match private_handler.import_private_transaction(r.as_raw()) {
Ok(transaction_hash) => { Ok(transaction_hash) => {
//don't send the packet back //don't send the packet back
@ -733,11 +737,63 @@ impl SyncHandler {
} }
}, },
Err(e) => { Err(e) => {
trace!(target: "sync", "Ignoring the message, error queueing: {}", e); trace!(target: "privatetx", "Ignoring the message, error queueing: {}", e);
} }
} }
Ok(()) Ok(())
} }
fn on_private_state_data(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
trace!(target: "sync", "{} Ignoring packet from unconfirmed/unknown peer", peer_id);
return Ok(());
}
if !sync.reset_peer_asking(peer_id, PeerAsking::PrivateState) {
trace!(target: "sync", "{}: Ignored unexpected private state data", peer_id);
return Ok(());
}
let requested_hash = sync.peers.get(&peer_id).and_then(|p| p.asking_private_state);
let requested_hash = match requested_hash {
Some(hash) => hash,
None => {
debug!(target: "sync", "{}: Ignored unexpected private state (requested_hash is None)", peer_id);
return Ok(());
}
};
let private_handler = match sync.private_tx_handler {
Some(ref handler) => handler,
None => {
trace!(target: "sync", "{} Ignoring private tx packet from peer", peer_id);
return Ok(());
}
};
trace!(target: "privatetx", "Received private state data packet from {:?}", peer_id);
let private_state_data: Bytes = r.val_at(0)?;
match io.private_state() {
Some(db) => {
// Check hash of the rececived data before submitting it to DB
let received_hash = db.state_hash(&private_state_data).unwrap_or_default();
if received_hash != requested_hash {
trace!(target: "sync", "{} Ignoring private state data with unexpected hash from peer", peer_id);
return Ok(());
}
match db.save_state(&private_state_data) {
Ok(hash) => {
if let Err(err) = private_handler.private_state_synced(&hash) {
trace!(target: "privatetx", "Ignoring received private state message, error queueing: {}", err);
}
}
Err(e) => {
error!(target: "privatetx", "Cannot save received private state {:?}", e);
}
}
}
None => {
trace!(target: "sync", "{} Ignoring private tx packet from peer", peer_id);
}
};
Ok(())
}
} }
#[cfg(test)] #[cfg(test)]
@ -765,7 +821,7 @@ mod tests {
let queue = RwLock::new(VecDeque::new()); let queue = RwLock::new(VecDeque::new());
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None, None);
let hashes_data = get_dummy_hashes(); let hashes_data = get_dummy_hashes();
let hashes_rlp = Rlp::new(&hashes_data); let hashes_rlp = Rlp::new(&hashes_data);
@ -786,7 +842,7 @@ mod tests {
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
//sync.have_common_block = true; //sync.have_common_block = true;
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None, None);
let block = Rlp::new(&block_data); let block = Rlp::new(&block_data);
@ -805,7 +861,7 @@ mod tests {
let queue = RwLock::new(VecDeque::new()); let queue = RwLock::new(VecDeque::new());
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None, None);
let block = Rlp::new(&block_data); let block = Rlp::new(&block_data);
@ -819,7 +875,7 @@ mod tests {
let queue = RwLock::new(VecDeque::new()); let queue = RwLock::new(VecDeque::new());
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None, None);
let empty_data = vec![]; let empty_data = vec![];
let block = Rlp::new(&empty_data); let block = Rlp::new(&empty_data);
@ -836,7 +892,7 @@ mod tests {
let queue = RwLock::new(VecDeque::new()); let queue = RwLock::new(VecDeque::new());
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None, None);
let empty_hashes_data = vec![]; let empty_hashes_data = vec![];
let hashes_rlp = Rlp::new(&empty_hashes_data); let hashes_rlp = Rlp::new(&empty_hashes_data);

View File

@ -152,6 +152,8 @@ pub const PAR_PROTOCOL_VERSION_1: (u8, u8) = (1, 0x15);
pub const PAR_PROTOCOL_VERSION_2: (u8, u8) = (2, 0x16); pub const PAR_PROTOCOL_VERSION_2: (u8, u8) = (2, 0x16);
/// 3 version of Parity protocol (private transactions messages added). /// 3 version of Parity protocol (private transactions messages added).
pub const PAR_PROTOCOL_VERSION_3: (u8, u8) = (3, 0x18); pub const PAR_PROTOCOL_VERSION_3: (u8, u8) = (3, 0x18);
/// 4 version of Parity protocol (private state sync added).
pub const PAR_PROTOCOL_VERSION_4: (u8, u8) = (4, 0x20);
pub const MAX_BODIES_TO_SEND: usize = 256; pub const MAX_BODIES_TO_SEND: usize = 256;
pub const MAX_HEADERS_TO_SEND: usize = 512; pub const MAX_HEADERS_TO_SEND: usize = 512;
@ -179,6 +181,7 @@ const RECEIPTS_TIMEOUT: Duration = Duration::from_secs(10);
const FORK_HEADER_TIMEOUT: Duration = Duration::from_secs(3); const FORK_HEADER_TIMEOUT: Duration = Duration::from_secs(3);
const SNAPSHOT_MANIFEST_TIMEOUT: Duration = Duration::from_secs(5); const SNAPSHOT_MANIFEST_TIMEOUT: Duration = Duration::from_secs(5);
const SNAPSHOT_DATA_TIMEOUT: Duration = Duration::from_secs(120); const SNAPSHOT_DATA_TIMEOUT: Duration = Duration::from_secs(120);
const PRIVATE_STATE_TIMEOUT: Duration = Duration::from_secs(120);
/// Defines how much time we have to complete priority transaction or block propagation. /// Defines how much time we have to complete priority transaction or block propagation.
/// after the deadline is reached the task is considered finished /// after the deadline is reached the task is considered finished
@ -277,6 +280,7 @@ pub enum PeerAsking {
BlockReceipts, BlockReceipts,
SnapshotManifest, SnapshotManifest,
SnapshotData, SnapshotData,
PrivateState,
} }
#[derive(PartialEq, Eq, Debug, Clone, Copy, MallocSizeOf)] #[derive(PartialEq, Eq, Debug, Clone, Copy, MallocSizeOf)]
@ -316,6 +320,8 @@ pub struct PeerInfo {
asking_blocks: Vec<H256>, asking_blocks: Vec<H256>,
/// Holds requested header hash if currently requesting block header by hash /// Holds requested header hash if currently requesting block header by hash
asking_hash: Option<H256>, asking_hash: Option<H256>,
/// Holds requested private state hash
asking_private_state: Option<H256>,
/// Holds requested snapshot chunk hash if any. /// Holds requested snapshot chunk hash if any.
asking_snapshot_data: Option<H256>, asking_snapshot_data: Option<H256>,
/// Request timestamp /// Request timestamp
@ -352,6 +358,7 @@ impl PeerInfo {
fn reset_asking(&mut self) { fn reset_asking(&mut self) {
self.asking_blocks.clear(); self.asking_blocks.clear();
self.asking_hash = None; self.asking_hash = None;
self.asking_private_state = None;
// mark any pending requests as expired // mark any pending requests as expired
if self.asking != PeerAsking::Nothing && self.is_allowed() { if self.asking != PeerAsking::Nothing && self.is_allowed() {
self.expired = true; self.expired = true;
@ -1185,6 +1192,7 @@ impl ChainSync {
PeerAsking::ForkHeader => elapsed > FORK_HEADER_TIMEOUT, PeerAsking::ForkHeader => elapsed > FORK_HEADER_TIMEOUT,
PeerAsking::SnapshotManifest => elapsed > SNAPSHOT_MANIFEST_TIMEOUT, PeerAsking::SnapshotManifest => elapsed > SNAPSHOT_MANIFEST_TIMEOUT,
PeerAsking::SnapshotData => elapsed > SNAPSHOT_DATA_TIMEOUT, PeerAsking::SnapshotData => elapsed > SNAPSHOT_DATA_TIMEOUT,
PeerAsking::PrivateState => elapsed > PRIVATE_STATE_TIMEOUT,
}; };
if timeout { if timeout {
debug!(target:"sync", "Timeout {}", peer_id); debug!(target:"sync", "Timeout {}", peer_id);
@ -1291,6 +1299,18 @@ impl ChainSync {
).collect() ).collect()
} }
fn get_private_state_peers(&self) -> Vec<PeerId> {
self.peers.iter().filter_map(
|(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_4.0
&& p.private_tx_enabled
&& self.active_peers.contains(id) {
Some(*id)
} else {
None
}
).collect()
}
/// Maintain other peers. Send out any new blocks and transactions /// Maintain other peers. Send out any new blocks and transactions
pub fn maintain_sync(&mut self, io: &mut dyn SyncIo) { pub fn maintain_sync(&mut self, io: &mut dyn SyncIo) {
self.maybe_start_snapshot_sync(io); self.maybe_start_snapshot_sync(io);
@ -1360,6 +1380,19 @@ impl ChainSync {
pub fn propagate_private_transaction(&mut self, io: &mut dyn SyncIo, transaction_hash: H256, packet_id: SyncPacket, packet: Bytes) { pub fn propagate_private_transaction(&mut self, io: &mut dyn SyncIo, transaction_hash: H256, packet_id: SyncPacket, packet: Bytes) {
SyncPropagator::propagate_private_transaction(self, io, transaction_hash, packet_id, packet); SyncPropagator::propagate_private_transaction(self, io, transaction_hash, packet_id, packet);
} }
/// Request private state from peers
pub fn request_private_state(&mut self, io: &mut SyncIo, hash: &H256) {
let private_state_peers = self.get_private_state_peers();
if private_state_peers.is_empty() {
error!(target: "privatetx", "Cannot request private state, no peers with private tx enabled available");
} else {
trace!(target: "privatetx", "Requesting private stats from {:?}", private_state_peers);
for peer_id in private_state_peers {
SyncRequester::request_private_state(self, io, peer_id, hash);
}
}
}
} }
#[cfg(test)] #[cfg(test)]
@ -1480,6 +1513,7 @@ pub mod tests {
asking: PeerAsking::Nothing, asking: PeerAsking::Nothing,
asking_blocks: Vec::new(), asking_blocks: Vec::new(),
asking_hash: None, asking_hash: None,
asking_private_state: None,
ask_time: Instant::now(), ask_time: Instant::now(),
last_sent_transactions: Default::default(), last_sent_transactions: Default::default(),
last_sent_private_transactions: Default::default(), last_sent_private_transactions: Default::default(),
@ -1533,7 +1567,7 @@ pub mod tests {
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
let chain_info = client.chain_info(); let chain_info = client.chain_info();
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None, None);
let peers = sync.get_lagging_peers(&chain_info); let peers = sync.get_lagging_peers(&chain_info);
SyncPropagator::propagate_new_hashes(&mut sync, &chain_info, &mut io, &peers); SyncPropagator::propagate_new_hashes(&mut sync, &chain_info, &mut io, &peers);
@ -1553,7 +1587,7 @@ pub mod tests {
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
let chain_info = client.chain_info(); let chain_info = client.chain_info();
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None, None);
let peers = sync.get_lagging_peers(&chain_info); let peers = sync.get_lagging_peers(&chain_info);
SyncPropagator::propagate_blocks(&mut sync, &chain_info, &mut io, &[], &peers); SyncPropagator::propagate_blocks(&mut sync, &chain_info, &mut io, &[], &peers);
@ -1591,7 +1625,7 @@ pub mod tests {
{ {
let queue = RwLock::new(VecDeque::new()); let queue = RwLock::new(VecDeque::new());
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None, None);
io.chain.miner.chain_new_blocks(io.chain, &[], &[], &[], &good_blocks, false); io.chain.miner.chain_new_blocks(io.chain, &[], &[], &[], &good_blocks, false);
sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks, &[], &[]); sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks, &[], &[]);
assert_eq!(io.chain.miner.ready_transactions(io.chain, 10, PendingOrdering::Priority).len(), 1); assert_eq!(io.chain.miner.ready_transactions(io.chain, 10, PendingOrdering::Priority).len(), 1);
@ -1604,7 +1638,7 @@ pub mod tests {
{ {
let queue = RwLock::new(VecDeque::new()); let queue = RwLock::new(VecDeque::new());
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&client, &ss, &queue, None); let mut io = TestIo::new(&client, &ss, &queue, None, None);
io.chain.miner.chain_new_blocks(io.chain, &[], &[], &good_blocks, &retracted_blocks, false); io.chain.miner.chain_new_blocks(io.chain, &[], &[], &good_blocks, &retracted_blocks, false);
sync.chain_new_blocks(&mut io, &[], &[], &good_blocks, &retracted_blocks, &[], &[]); sync.chain_new_blocks(&mut io, &[], &[], &good_blocks, &retracted_blocks, &[], &[]);
} }
@ -1627,7 +1661,7 @@ pub mod tests {
let queue = RwLock::new(VecDeque::new()); let queue = RwLock::new(VecDeque::new());
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None, None);
// when // when
sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks, &[], &[]); sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks, &[], &[]);

View File

@ -355,7 +355,7 @@ mod tests {
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
let chain_info = client.chain_info(); let chain_info = client.chain_info();
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None, None);
let peers = sync.get_lagging_peers(&chain_info); let peers = sync.get_lagging_peers(&chain_info);
let peer_count = SyncPropagator::propagate_new_hashes(&mut sync, &chain_info, &mut io, &peers); let peer_count = SyncPropagator::propagate_new_hashes(&mut sync, &chain_info, &mut io, &peers);
@ -376,7 +376,7 @@ mod tests {
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
let chain_info = client.chain_info(); let chain_info = client.chain_info();
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None, None);
let peers = sync.get_lagging_peers(&chain_info); let peers = sync.get_lagging_peers(&chain_info);
let peer_count = SyncPropagator::propagate_blocks(&mut sync, &chain_info, &mut io, &[], &peers); let peer_count = SyncPropagator::propagate_blocks(&mut sync, &chain_info, &mut io, &[], &peers);
@ -397,7 +397,7 @@ mod tests {
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
let chain_info = client.chain_info(); let chain_info = client.chain_info();
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None, None);
let peers = sync.get_lagging_peers(&chain_info); let peers = sync.get_lagging_peers(&chain_info);
let peer_count = SyncPropagator::propagate_blocks(&mut sync ,&chain_info, &mut io, &[hash.clone()], &peers); let peer_count = SyncPropagator::propagate_blocks(&mut sync ,&chain_info, &mut io, &[hash.clone()], &peers);
@ -427,6 +427,7 @@ mod tests {
asking: PeerAsking::Nothing, asking: PeerAsking::Nothing,
asking_blocks: Vec::new(), asking_blocks: Vec::new(),
asking_hash: None, asking_hash: None,
asking_private_state: None,
ask_time: Instant::now(), ask_time: Instant::now(),
last_sent_transactions: Default::default(), last_sent_transactions: Default::default(),
last_sent_private_transactions: Default::default(), last_sent_private_transactions: Default::default(),
@ -440,7 +441,7 @@ mod tests {
client_version: ClientVersion::from(""), client_version: ClientVersion::from(""),
}); });
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None, None);
SyncPropagator::propagate_proposed_blocks(&mut sync, &mut io, &[block]); SyncPropagator::propagate_proposed_blocks(&mut sync, &mut io, &[block]);
// 1 message should be sent // 1 message should be sent
@ -457,7 +458,7 @@ mod tests {
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(1), &client); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(1), &client);
let queue = RwLock::new(VecDeque::new()); let queue = RwLock::new(VecDeque::new());
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None, None);
let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true);
// Try to propagate same transactions for the second time // Try to propagate same transactions for the second time
let peer_count2 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); let peer_count2 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true);
@ -484,7 +485,7 @@ mod tests {
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(1), &client); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(1), &client);
let queue = RwLock::new(VecDeque::new()); let queue = RwLock::new(VecDeque::new());
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None, None);
let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true);
io.chain.insert_transaction_to_queue(); io.chain.insert_transaction_to_queue();
// New block import should not trigger propagation. // New block import should not trigger propagation.
@ -508,7 +509,7 @@ mod tests {
let mut sync = ChainSync::new(SyncConfig::default(), &client, None); let mut sync = ChainSync::new(SyncConfig::default(), &client, None);
let queue = RwLock::new(VecDeque::new()); let queue = RwLock::new(VecDeque::new());
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None, None);
let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true);
sync.chain_new_blocks(&mut io, &[], &[], &[], &[], &[], &[]); sync.chain_new_blocks(&mut io, &[], &[], &[], &[], &[], &[]);
// Try to propagate same transactions for the second time // Try to propagate same transactions for the second time
@ -529,7 +530,7 @@ mod tests {
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
// should sent some // should sent some
{ {
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None, None);
let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true);
assert_eq!(1, io.packets.len()); assert_eq!(1, io.packets.len());
assert_eq!(1, peer_count); assert_eq!(1, peer_count);
@ -537,7 +538,7 @@ mod tests {
// Insert some more // Insert some more
client.insert_transaction_to_queue(); client.insert_transaction_to_queue();
let (peer_count2, peer_count3) = { let (peer_count2, peer_count3) = {
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None, None);
// Propagate new transactions // Propagate new transactions
let peer_count2 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); let peer_count2 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true);
// And now the peer should have all transactions // And now the peer should have all transactions
@ -563,7 +564,7 @@ mod tests {
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(1), &client); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(1), &client);
let queue = RwLock::new(VecDeque::new()); let queue = RwLock::new(VecDeque::new());
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None, None);
SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true);
let stats = sync.transactions_stats(); let stats = sync.transactions_stats();
@ -578,7 +579,7 @@ mod tests {
let mut sync = ChainSync::new(SyncConfig::default(), &client, None); let mut sync = ChainSync::new(SyncConfig::default(), &client, None);
let queue = RwLock::new(VecDeque::new()); let queue = RwLock::new(VecDeque::new());
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None, None);
// when peer#1 is Geth // when peer#1 is Geth
insert_dummy_peer(&mut sync, 1, block_hash); insert_dummy_peer(&mut sync, 1, block_hash);
@ -608,7 +609,7 @@ mod tests {
let mut sync = ChainSync::new(SyncConfig::default(), &client, None); let mut sync = ChainSync::new(SyncConfig::default(), &client, None);
let queue = RwLock::new(VecDeque::new()); let queue = RwLock::new(VecDeque::new());
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None, None);
// when peer#1 is Parity, accepting service transactions // when peer#1 is Parity, accepting service transactions
insert_dummy_peer(&mut sync, 1, block_hash); insert_dummy_peer(&mut sync, 1, block_hash);

View File

@ -30,6 +30,7 @@ use super::sync_packet::SyncPacket::{
GetReceiptsPacket, GetReceiptsPacket,
GetSnapshotManifestPacket, GetSnapshotManifestPacket,
GetSnapshotDataPacket, GetSnapshotDataPacket,
GetPrivateStatePacket,
}; };
use super::{ use super::{
@ -99,6 +100,15 @@ impl SyncRequester {
SyncRequester::send_request(sync, io, peer_id, PeerAsking::SnapshotManifest, GetSnapshotManifestPacket, rlp.out()); SyncRequester::send_request(sync, io, peer_id, PeerAsking::SnapshotManifest, GetSnapshotManifestPacket, rlp.out());
} }
pub fn request_private_state(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, hash: &H256) {
trace!(target: "privatetx", "{} <- GetPrivateStatePacket", peer_id);
let mut rlp = RlpStream::new_list(1);
rlp.append(hash);
SyncRequester::send_request(sync, io, peer_id, PeerAsking::PrivateState, GetPrivateStatePacket, rlp.out());
let peer = sync.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed");
peer.asking_private_state = Some(hash.clone());
}
/// Request headers from a peer by block hash /// Request headers from a peer by block hash
fn request_headers_by_hash(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, h: &H256, count: u64, skip: u64, reverse: bool, set: BlockSet) { fn request_headers_by_hash(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, h: &H256, count: u64, skip: u64, reverse: bool, set: BlockSet) {
trace!(target: "sync", "{} <- GetBlockHeaders: {} entries starting from {}, set = {:?}", peer_id, count, h, set); trace!(target: "sync", "{} <- GetBlockHeaders: {} entries starting from {}, set = {:?}", peer_id, count, h, set);

View File

@ -43,6 +43,8 @@ use super::sync_packet::SyncPacket::{
GetSnapshotDataPacket, GetSnapshotDataPacket,
SnapshotDataPacket, SnapshotDataPacket,
ConsensusDataPacket, ConsensusDataPacket,
GetPrivateStatePacket,
PrivateStatePacket,
}; };
use super::{ use super::{
@ -98,6 +100,11 @@ impl SyncSupplier {
SyncSupplier::return_snapshot_data, SyncSupplier::return_snapshot_data,
|e| format!("Error sending snapshot data: {:?}", e)), |e| format!("Error sending snapshot data: {:?}", e)),
GetPrivateStatePacket => SyncSupplier::return_rlp(
io, &rlp, peer,
SyncSupplier::return_private_state,
|e| format!("Error sending private state data: {:?}", e)),
StatusPacket => { StatusPacket => {
sync.write().on_packet(io, peer, packet_id, data); sync.write().on_packet(io, peer, packet_id, data);
Ok(()) Ok(())
@ -348,6 +355,26 @@ impl SyncSupplier {
Ok(Some((SnapshotDataPacket.id(), rlp))) Ok(Some((SnapshotDataPacket.id(), rlp)))
} }
/// Respond to GetPrivateStatePacket
fn return_private_state(io: &SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult {
let hash: H256 = r.val_at(0)?;
trace!(target: "privatetx", "{} -> GetPrivateStatePacket {:?}", peer_id, hash);
io.private_state().map_or(Ok(None), |db| {
let state = db.state(&hash);
match state {
Err(err) => {
trace!(target: "privatetx", "Cannot retrieve state from db {:?}", err);
Ok(None)
}
Ok(bytes) => {
let mut rlp = RlpStream::new_list(1);
rlp.append(&bytes);
Ok(Some((PrivateStatePacket.id(), rlp)))
}
}
})
}
fn return_rlp<FRlp, FError>(io: &mut dyn SyncIo, rlp: &Rlp, peer: PeerId, rlp_func: FRlp, error_func: FError) -> Result<(), PacketDecodeError> fn return_rlp<FRlp, FError>(io: &mut dyn SyncIo, rlp: &Rlp, peer: PeerId, rlp_func: FRlp, error_func: FError) -> Result<(), PacketDecodeError>
where FRlp : Fn(&dyn SyncIo, &Rlp, PeerId) -> RlpResponseResult, where FRlp : Fn(&dyn SyncIo, &Rlp, PeerId) -> RlpResponseResult,
FError : FnOnce(network::Error) -> String FError : FnOnce(network::Error) -> String
@ -412,7 +439,7 @@ mod test {
let queue = RwLock::new(VecDeque::new()); let queue = RwLock::new(VecDeque::new());
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let io = TestIo::new(&mut client, &ss, &queue, None); let io = TestIo::new(&mut client, &ss, &queue, None, None);
let unknown: H256 = H256::zero(); let unknown: H256 = H256::zero();
let result = SyncSupplier::return_block_headers(&io, &Rlp::new(&make_hash_req(&unknown, 1, 0, false)), 0); let result = SyncSupplier::return_block_headers(&io, &Rlp::new(&make_hash_req(&unknown, 1, 0, false)), 0);
@ -470,7 +497,7 @@ mod test {
let queue = RwLock::new(VecDeque::new()); let queue = RwLock::new(VecDeque::new());
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let io = TestIo::new(&mut client, &ss, &queue, None); let io = TestIo::new(&mut client, &ss, &queue, None, None);
let small_result = SyncSupplier::return_block_bodies(&io, &Rlp::new(&small_rlp_request.out()), 0); let small_result = SyncSupplier::return_block_bodies(&io, &Rlp::new(&small_rlp_request.out()), 0);
let small_result = small_result.unwrap().unwrap().1; let small_result = small_result.unwrap().unwrap().1;
@ -487,7 +514,7 @@ mod test {
let queue = RwLock::new(VecDeque::new()); let queue = RwLock::new(VecDeque::new());
let sync = dummy_sync_with_peer(H256::zero(), &client); let sync = dummy_sync_with_peer(H256::zero(), &client);
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None, None);
let mut node_list = RlpStream::new_list(3); let mut node_list = RlpStream::new_list(3);
node_list.append(&H256::from_str("0000000000000000000000000000000000000000000000005555555555555555").unwrap()); node_list.append(&H256::from_str("0000000000000000000000000000000000000000000000005555555555555555").unwrap());
@ -518,7 +545,7 @@ mod test {
let mut client = TestBlockChainClient::new(); let mut client = TestBlockChainClient::new();
let queue = RwLock::new(VecDeque::new()); let queue = RwLock::new(VecDeque::new());
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let io = TestIo::new(&mut client, &ss, &queue, None); let io = TestIo::new(&mut client, &ss, &queue, None, None);
let result = SyncSupplier::return_receipts(&io, &Rlp::new(&[0xc0]), 0); let result = SyncSupplier::return_receipts(&io, &Rlp::new(&[0xc0]), 0);
@ -531,7 +558,7 @@ mod test {
let queue = RwLock::new(VecDeque::new()); let queue = RwLock::new(VecDeque::new());
let sync = dummy_sync_with_peer(H256::zero(), &client); let sync = dummy_sync_with_peer(H256::zero(), &client);
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None, None);
let mut receipt_list = RlpStream::new_list(4); let mut receipt_list = RlpStream::new_list(4);
receipt_list.append(&H256::from_str("0000000000000000000000000000000000000000000000005555555555555555").unwrap()); receipt_list.append(&H256::from_str("0000000000000000000000000000000000000000000000005555555555555555").unwrap());

View File

@ -55,6 +55,8 @@ enum_from_primitive! {
ConsensusDataPacket = 0x15, ConsensusDataPacket = 0x15,
PrivateTransactionPacket = 0x16, PrivateTransactionPacket = 0x16,
SignedPrivateTransactionPacket = 0x17, SignedPrivateTransactionPacket = 0x17,
GetPrivateStatePacket = 0x18,
PrivateStatePacket = 0x19,
} }
} }
@ -94,7 +96,9 @@ impl PacketInfo for SyncPacket {
SnapshotDataPacket | SnapshotDataPacket |
ConsensusDataPacket | ConsensusDataPacket |
PrivateTransactionPacket | PrivateTransactionPacket |
SignedPrivateTransactionPacket SignedPrivateTransactionPacket |
GetPrivateStatePacket |
PrivateStatePacket
=> WARP_SYNC_PROTOCOL_ID, => WARP_SYNC_PROTOCOL_ID,
} }

View File

@ -35,6 +35,7 @@ extern crate keccak_hash as hash;
extern crate parity_bytes as bytes; extern crate parity_bytes as bytes;
extern crate parity_runtime; extern crate parity_runtime;
extern crate parking_lot; extern crate parking_lot;
extern crate ethcore_private_tx;
extern crate rand; extern crate rand;
extern crate rlp; extern crate rlp;
extern crate triehash_ethereum; extern crate triehash_ethereum;
@ -43,7 +44,6 @@ extern crate futures;
extern crate ethcore_light as light; extern crate ethcore_light as light;
#[cfg(test)] extern crate env_logger; #[cfg(test)] extern crate env_logger;
#[cfg(test)] extern crate ethcore_private_tx;
#[cfg(test)] extern crate kvdb_memorydb; #[cfg(test)] extern crate kvdb_memorydb;
#[cfg(test)] extern crate rustc_hex; #[cfg(test)] extern crate rustc_hex;
#[cfg(test)] extern crate rand_xorshift; #[cfg(test)] extern crate rand_xorshift;

View File

@ -26,6 +26,9 @@ pub trait PrivateTxHandler: Send + Sync + 'static {
/// Function called on new signed private transaction received. /// Function called on new signed private transaction received.
/// Returns the hash of the imported transaction /// Returns the hash of the imported transaction
fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result<H256, String>; fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result<H256, String>;
/// Function called when requested private state retrieved from peer and saved to DB.
fn private_state_synced(&self, hash: &H256) -> Result<(), String>;
} }
/// Nonoperative private transaction handler. /// Nonoperative private transaction handler.
@ -39,6 +42,10 @@ impl PrivateTxHandler for NoopPrivateTxHandler {
fn import_signed_private_transaction(&self, _rlp: &[u8]) -> Result<H256, String> { fn import_signed_private_transaction(&self, _rlp: &[u8]) -> Result<H256, String> {
Ok(H256::zero()) Ok(H256::zero())
} }
fn private_state_synced(&self, _hash: &H256) -> Result<(), String> {
Ok(())
}
} }
/// Simple private transaction handler. Used for tests. /// Simple private transaction handler. Used for tests.
@ -48,6 +55,8 @@ pub struct SimplePrivateTxHandler {
pub txs: Mutex<Vec<Vec<u8>>>, pub txs: Mutex<Vec<Vec<u8>>>,
/// imported signed private transactions /// imported signed private transactions
pub signed_txs: Mutex<Vec<Vec<u8>>>, pub signed_txs: Mutex<Vec<Vec<u8>>>,
/// synced private state hash
pub synced_hash: Mutex<H256>,
} }
impl PrivateTxHandler for SimplePrivateTxHandler { impl PrivateTxHandler for SimplePrivateTxHandler {
@ -60,4 +69,9 @@ impl PrivateTxHandler for SimplePrivateTxHandler {
self.signed_txs.lock().push(rlp.to_vec()); self.signed_txs.lock().push(rlp.to_vec());
Ok(H256::zero()) Ok(H256::zero())
} }
fn private_state_synced(&self, hash: &H256) -> Result<(), String> {
*self.synced_hash.lock() = *hash;
Ok(())
}
} }

View File

@ -14,12 +14,14 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>. // along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
use std::sync::Arc;
use std::collections::HashMap; use std::collections::HashMap;
use chain::sync_packet::{PacketInfo, SyncPacket}; use chain::sync_packet::{PacketInfo, SyncPacket};
use network::{NetworkContext, PeerId, PacketId, Error, SessionInfo, ProtocolId}; use network::{NetworkContext, PeerId, PacketId, Error, SessionInfo, ProtocolId};
use network::client_version::ClientVersion; use network::client_version::ClientVersion;
use bytes::Bytes; use bytes::Bytes;
use client_traits::BlockChainClient; use client_traits::BlockChainClient;
use ethcore_private_tx::PrivateStateDB;
use types::BlockNumber; use types::BlockNumber;
use ethcore::snapshot::SnapshotService; use ethcore::snapshot::SnapshotService;
use parking_lot::RwLock; use parking_lot::RwLock;
@ -40,6 +42,8 @@ pub trait SyncIo {
fn chain(&self) -> &dyn BlockChainClient; fn chain(&self) -> &dyn BlockChainClient;
/// Get the snapshot service. /// Get the snapshot service.
fn snapshot_service(&self) -> &dyn SnapshotService; fn snapshot_service(&self) -> &dyn SnapshotService;
/// Get the private state wrapper
fn private_state(&self) -> Option<Arc<PrivateStateDB>>;
/// Returns peer version identifier /// Returns peer version identifier
fn peer_version(&self, peer_id: PeerId) -> ClientVersion { fn peer_version(&self, peer_id: PeerId) -> ClientVersion {
ClientVersion::from(peer_id.to_string()) ClientVersion::from(peer_id.to_string())
@ -68,6 +72,7 @@ pub struct NetSyncIo<'s> {
chain: &'s dyn BlockChainClient, chain: &'s dyn BlockChainClient,
snapshot_service: &'s dyn SnapshotService, snapshot_service: &'s dyn SnapshotService,
chain_overlay: &'s RwLock<HashMap<BlockNumber, Bytes>>, chain_overlay: &'s RwLock<HashMap<BlockNumber, Bytes>>,
private_state: Option<Arc<PrivateStateDB>>,
} }
impl<'s> NetSyncIo<'s> { impl<'s> NetSyncIo<'s> {
@ -75,12 +80,14 @@ impl<'s> NetSyncIo<'s> {
pub fn new(network: &'s dyn NetworkContext, pub fn new(network: &'s dyn NetworkContext,
chain: &'s dyn BlockChainClient, chain: &'s dyn BlockChainClient,
snapshot_service: &'s dyn SnapshotService, snapshot_service: &'s dyn SnapshotService,
chain_overlay: &'s RwLock<HashMap<BlockNumber, Bytes>>) -> NetSyncIo<'s> { chain_overlay: &'s RwLock<HashMap<BlockNumber, Bytes>>,
private_state: Option<Arc<PrivateStateDB>>) -> NetSyncIo<'s> {
NetSyncIo { NetSyncIo {
network: network, network,
chain: chain, chain,
snapshot_service: snapshot_service, snapshot_service,
chain_overlay: chain_overlay, chain_overlay,
private_state,
} }
} }
} }
@ -114,6 +121,10 @@ impl<'s> SyncIo for NetSyncIo<'s> {
self.snapshot_service self.snapshot_service
} }
fn private_state(&self) -> Option<Arc<PrivateStateDB>> {
self.private_state.clone()
}
fn peer_session_info(&self, peer_id: PeerId) -> Option<SessionInfo> { fn peer_session_info(&self, peer_id: PeerId) -> Option<SessionInfo> {
self.network.session_info(peer_id) self.network.session_info(peer_id)
} }

View File

@ -27,12 +27,13 @@ use ethcore::client::{TestBlockChainClient, Client as EthcoreClient,
ClientConfig, ChainNotify, NewBlocks, ChainMessageType, ClientIoMessage}; ClientConfig, ChainNotify, NewBlocks, ChainMessageType, ClientIoMessage};
use ethcore::snapshot::SnapshotService; use ethcore::snapshot::SnapshotService;
use ethcore::spec::{self, Spec}; use ethcore::spec::{self, Spec};
use ethcore_private_tx::PrivateStateDB;
use ethcore::miner::Miner; use ethcore::miner::Miner;
use ethcore::test_helpers; use ethcore::test_helpers;
use sync_io::SyncIo; use sync_io::SyncIo;
use io::{IoChannel, IoContext, IoHandler}; use io::{IoChannel, IoContext, IoHandler};
use api::WARP_SYNC_PROTOCOL_ID; use api::WARP_SYNC_PROTOCOL_ID;
use chain::{ChainSync, SyncSupplier, ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_3}; use chain::{ChainSync, SyncSupplier, ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_4};
use chain::sync_packet::{PacketInfo, SyncPacket}; use chain::sync_packet::{PacketInfo, SyncPacket};
use chain::sync_packet::SyncPacket::{PrivateTransactionPacket, SignedPrivateTransactionPacket}; use chain::sync_packet::SyncPacket::{PrivateTransactionPacket, SignedPrivateTransactionPacket};
@ -60,20 +61,28 @@ pub struct TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p {
pub to_disconnect: HashSet<PeerId>, pub to_disconnect: HashSet<PeerId>,
pub packets: Vec<TestPacket>, pub packets: Vec<TestPacket>,
pub peers_info: HashMap<PeerId, String>, pub peers_info: HashMap<PeerId, String>,
pub private_state_db: Option<Arc<PrivateStateDB>>,
overlay: RwLock<HashMap<BlockNumber, Bytes>>, overlay: RwLock<HashMap<BlockNumber, Bytes>>,
} }
impl<'p, C> TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p { impl<'p, C> TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p {
pub fn new(chain: &'p C, ss: &'p TestSnapshotService, queue: &'p RwLock<VecDeque<TestPacket>>, sender: Option<PeerId>) -> TestIo<'p, C> { pub fn new(
chain: &'p C,
ss: &'p TestSnapshotService,
queue: &'p RwLock<VecDeque<TestPacket>>,
sender: Option<PeerId>,
private_state_db: Option<Arc<PrivateStateDB>>
) -> TestIo<'p, C> {
TestIo { TestIo {
chain: chain, chain,
snapshot_service: ss, snapshot_service: ss,
queue: queue, queue,
sender: sender, sender,
to_disconnect: HashSet::new(), to_disconnect: HashSet::new(),
overlay: RwLock::new(HashMap::new()),
packets: Vec::new(), packets: Vec::new(),
peers_info: HashMap::new(), peers_info: HashMap::new(),
private_state_db,
overlay: RwLock::new(HashMap::new()),
} }
} }
} }
@ -131,6 +140,10 @@ impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p {
self.snapshot_service self.snapshot_service
} }
fn private_state(&self) -> Option<Arc<PrivateStateDB>> {
self.private_state_db.clone()
}
fn peer_session_info(&self, _peer_id: PeerId) -> Option<SessionInfo> { fn peer_session_info(&self, _peer_id: PeerId) -> Option<SessionInfo> {
None None
} }
@ -140,7 +153,7 @@ impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p {
} }
fn protocol_version(&self, protocol: &ProtocolId, peer_id: PeerId) -> u8 { fn protocol_version(&self, protocol: &ProtocolId, peer_id: PeerId) -> u8 {
if protocol == &WARP_SYNC_PROTOCOL_ID { PAR_PROTOCOL_VERSION_3.0 } else { self.eth_protocol_version(peer_id) } if protocol == &WARP_SYNC_PROTOCOL_ID { PAR_PROTOCOL_VERSION_4.0 } else { self.eth_protocol_version(peer_id) }
} }
fn chain_overlay(&self) -> &RwLock<HashMap<BlockNumber, Bytes>> { fn chain_overlay(&self) -> &RwLock<HashMap<BlockNumber, Bytes>> {
@ -220,6 +233,7 @@ pub struct EthPeer<C> where C: FlushingBlockChainClient {
pub private_tx_handler: Arc<SimplePrivateTxHandler>, pub private_tx_handler: Arc<SimplePrivateTxHandler>,
pub io_queue: RwLock<VecDeque<ChainMessageType>>, pub io_queue: RwLock<VecDeque<ChainMessageType>>,
new_blocks_queue: RwLock<VecDeque<NewBlockMessage>>, new_blocks_queue: RwLock<VecDeque<NewBlockMessage>>,
private_state_db: RwLock<Option<Arc<PrivateStateDB>>>,
} }
impl<C> EthPeer<C> where C: FlushingBlockChainClient { impl<C> EthPeer<C> where C: FlushingBlockChainClient {
@ -232,18 +246,20 @@ impl<C> EthPeer<C> where C: FlushingBlockChainClient {
} }
fn process_io_message(&self, message: ChainMessageType) { fn process_io_message(&self, message: ChainMessageType) {
let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None); let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None, self.private_state_db());
match message { match message {
ChainMessageType::Consensus(data) => self.sync.write().propagate_consensus_packet(&mut io, data), ChainMessageType::Consensus(data) => self.sync.write().propagate_consensus_packet(&mut io, data),
ChainMessageType::PrivateTransaction(transaction_hash, data) => ChainMessageType::PrivateTransaction(transaction_hash, data) =>
self.sync.write().propagate_private_transaction(&mut io, transaction_hash, PrivateTransactionPacket, data), self.sync.write().propagate_private_transaction(&mut io, transaction_hash, PrivateTransactionPacket, data),
ChainMessageType::SignedPrivateTransaction(transaction_hash, data) => ChainMessageType::SignedPrivateTransaction(transaction_hash, data) =>
self.sync.write().propagate_private_transaction(&mut io, transaction_hash, SignedPrivateTransactionPacket, data), self.sync.write().propagate_private_transaction(&mut io, transaction_hash, SignedPrivateTransactionPacket, data),
ChainMessageType::PrivateStateRequest(hash) =>
self.sync.write().request_private_state(&mut io, &hash),
} }
} }
fn process_new_block_message(&self, message: NewBlockMessage) { fn process_new_block_message(&self, message: NewBlockMessage) {
let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None); let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None, self.private_state_db());
self.sync.write().chain_new_blocks( self.sync.write().chain_new_blocks(
&mut io, &mut io,
&message.imported, &message.imported,
@ -254,6 +270,15 @@ impl<C> EthPeer<C> where C: FlushingBlockChainClient {
&message.proposed &message.proposed
); );
} }
pub fn set_private_state_db(&self, db: Arc<PrivateStateDB>) {
*self.private_state_db.write() = Some(db);
}
fn private_state_db(&self) -> Option<Arc<PrivateStateDB>> {
let db = self.private_state_db.read();
db.clone()
}
} }
impl<C: FlushingBlockChainClient> Peer for EthPeer<C> { impl<C: FlushingBlockChainClient> Peer for EthPeer<C> {
@ -265,17 +290,18 @@ impl<C: FlushingBlockChainClient> Peer for EthPeer<C> {
&*self.chain, &*self.chain,
&self.snapshot_service, &self.snapshot_service,
&self.queue, &self.queue,
Some(other)), Some(other),
self.private_state_db()),
other); other);
} }
fn on_disconnect(&self, other: PeerId) { fn on_disconnect(&self, other: PeerId) {
let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, Some(other)); let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, Some(other), self.private_state_db());
self.sync.write().on_peer_aborting(&mut io, other); self.sync.write().on_peer_aborting(&mut io, other);
} }
fn receive_message(&self, from: PeerId, msg: TestPacket) -> HashSet<PeerId> { fn receive_message(&self, from: PeerId, msg: TestPacket) -> HashSet<PeerId> {
let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, Some(from)); let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, Some(from), self.private_state_db());
SyncSupplier::dispatch_packet(&self.sync, &mut io, from, msg.packet_id, &msg.data); SyncSupplier::dispatch_packet(&self.sync, &mut io, from, msg.packet_id, &msg.data);
self.chain.flush(); self.chain.flush();
io.to_disconnect.clone() io.to_disconnect.clone()
@ -291,7 +317,7 @@ impl<C: FlushingBlockChainClient> Peer for EthPeer<C> {
} }
fn sync_step(&self) { fn sync_step(&self) {
let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None); let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None, self.private_state_db());
self.chain.flush(); self.chain.flush();
self.sync.write().maintain_peers(&mut io); self.sync.write().maintain_peers(&mut io);
self.sync.write().maintain_sync(&mut io); self.sync.write().maintain_sync(&mut io);
@ -300,7 +326,7 @@ impl<C: FlushingBlockChainClient> Peer for EthPeer<C> {
} }
fn restart_sync(&self) { fn restart_sync(&self) {
self.sync.write().restart(&mut TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None)); self.sync.write().restart(&mut TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None, self.private_state_db()));
} }
fn process_all_io_messages(&self) { fn process_all_io_messages(&self) {
@ -357,6 +383,7 @@ impl TestNet<EthPeer<TestBlockChainClient>> {
private_tx_handler, private_tx_handler,
io_queue: RwLock::new(VecDeque::new()), io_queue: RwLock::new(VecDeque::new()),
new_blocks_queue: RwLock::new(VecDeque::new()), new_blocks_queue: RwLock::new(VecDeque::new()),
private_state_db: RwLock::new(None),
})); }));
} }
net net
@ -410,6 +437,7 @@ impl TestNet<EthPeer<EthcoreClient>> {
private_tx_handler, private_tx_handler,
io_queue: RwLock::new(VecDeque::new()), io_queue: RwLock::new(VecDeque::new()),
new_blocks_queue: RwLock::new(VecDeque::new()), new_blocks_queue: RwLock::new(VecDeque::new()),
private_state_db: RwLock::new(None),
}); });
peer.chain.add_notify(peer.clone()); peer.chain.add_notify(peer.clone());
//private_provider.add_notify(peer.clone()); //private_provider.add_notify(peer.clone());
@ -508,7 +536,7 @@ impl<P> TestNet<P> where P: Peer {
impl<C: FlushingBlockChainClient> TestNet<EthPeer<C>> { impl<C: FlushingBlockChainClient> TestNet<EthPeer<C>> {
pub fn trigger_chain_new_blocks(&mut self, peer_id: usize) { pub fn trigger_chain_new_blocks(&mut self, peer_id: usize) {
let peer = &mut self.peers[peer_id]; let peer = &mut self.peers[peer_id];
peer.sync.write().chain_new_blocks(&mut TestIo::new(&*peer.chain, &peer.snapshot_service, &peer.queue, None), &[], &[], &[], &[], &[], &[]); peer.sync.write().chain_new_blocks(&mut TestIo::new(&*peer.chain, &peer.snapshot_service, &peer.queue, None, None), &[], &[], &[], &[], &[], &[]);
} }
} }

View File

@ -26,7 +26,7 @@ use ethcore::{
client::ClientIoMessage, client::ClientIoMessage,
miner::{self, MinerService}, miner::{self, MinerService},
spec::Spec, spec::Spec,
test_helpers::push_block_with_transactions, test_helpers::{push_block_with_transactions, new_db},
}; };
use ethcore_private_tx::{Provider, ProviderConfig, NoopEncryptor, Importer, SignedPrivateTransaction, StoringKeyProvider}; use ethcore_private_tx::{Provider, ProviderConfig, NoopEncryptor, Importer, SignedPrivateTransaction, StoringKeyProvider};
use ethkey::KeyPair; use ethkey::KeyPair;
@ -73,16 +73,18 @@ fn send_private_transaction() {
validator_accounts: vec![s1.address()], validator_accounts: vec![s1.address()],
signer_account: None, signer_account: None,
logs_path: None, logs_path: None,
use_offchain_storage: false,
}; };
let signer_config = ProviderConfig{ let signer_config = ProviderConfig{
validator_accounts: Vec::new(), validator_accounts: Vec::new(),
signer_account: Some(s0.address()), signer_account: Some(s0.address()),
logs_path: None, logs_path: None,
use_offchain_storage: false,
}; };
let private_keys = Arc::new(StoringKeyProvider::default()); let private_keys = Arc::new(StoringKeyProvider::default());
let db = new_db();
let pm0 = Arc::new(Provider::new( let pm0 = Arc::new(Provider::new(
client0.clone(), client0.clone(),
net.peer(0).miner.clone(), net.peer(0).miner.clone(),
@ -91,6 +93,7 @@ fn send_private_transaction() {
signer_config, signer_config,
IoChannel::to_handler(Arc::downgrade(&io_handler0)), IoChannel::to_handler(Arc::downgrade(&io_handler0)),
private_keys.clone(), private_keys.clone(),
db.key_value().clone(),
)); ));
pm0.add_notify(net.peers[0].clone()); pm0.add_notify(net.peers[0].clone());
@ -102,6 +105,7 @@ fn send_private_transaction() {
validator_config, validator_config,
IoChannel::to_handler(Arc::downgrade(&io_handler1)), IoChannel::to_handler(Arc::downgrade(&io_handler1)),
private_keys.clone(), private_keys.clone(),
db.key_value().clone(),
)); ));
pm1.add_notify(net.peers[1].clone()); pm1.add_notify(net.peers[1].clone());
@ -157,3 +161,134 @@ fn send_private_transaction() {
let local_transactions = net.peer(0).miner.local_transactions(); let local_transactions = net.peer(0).miner.local_transactions();
assert_eq!(local_transactions.len(), 1); assert_eq!(local_transactions.len(), 1);
} }
#[test]
fn sync_private_state() {
// Setup two clients
let s0 = KeyPair::from_secret_slice(&keccak("1").as_bytes()).unwrap();
let s1 = KeyPair::from_secret_slice(&keccak("0").as_bytes()).unwrap();
let signer = Arc::new(ethcore_private_tx::KeyPairSigner(vec![s0.clone(), s1.clone()]));
let mut net = TestNet::with_spec(2, SyncConfig::default(), seal_spec);
let client0 = net.peer(0).chain.clone();
let client1 = net.peer(1).chain.clone();
let io_handler0: Arc<IoHandler<ClientIoMessage>> = Arc::new(TestIoHandler::new(net.peer(0).chain.clone()));
let io_handler1: Arc<IoHandler<ClientIoMessage>> = Arc::new(TestIoHandler::new(net.peer(1).chain.clone()));
net.peer(0).miner.set_author(miner::Author::Sealer(signer::from_keypair(s0.clone())));
net.peer(1).miner.set_author(miner::Author::Sealer(signer::from_keypair(s1.clone())));
net.peer(0).chain.engine().register_client(Arc::downgrade(&net.peer(0).chain) as _);
net.peer(1).chain.engine().register_client(Arc::downgrade(&net.peer(1).chain) as _);
net.peer(0).chain.set_io_channel(IoChannel::to_handler(Arc::downgrade(&io_handler0)));
net.peer(1).chain.set_io_channel(IoChannel::to_handler(Arc::downgrade(&io_handler1)));
let (address, _) = contract_address(CreateContractAddress::FromSenderAndNonce, &s0.address(), &0.into(), &[]);
let chain_id = client0.signing_chain_id();
// Exhange statuses
net.sync();
// Setup private providers
let validator_config = ProviderConfig{
validator_accounts: vec![s1.address()],
signer_account: None,
logs_path: None,
use_offchain_storage: true,
};
let signer_config = ProviderConfig{
validator_accounts: Vec::new(),
signer_account: Some(s0.address()),
logs_path: None,
use_offchain_storage: true,
};
let private_keys = Arc::new(StoringKeyProvider::default());
let db0 = new_db();
let pm0 = Arc::new(Provider::new(
client0.clone(),
net.peer(0).miner.clone(),
signer.clone(),
Box::new(NoopEncryptor::default()),
signer_config,
IoChannel::to_handler(Arc::downgrade(&io_handler0)),
private_keys.clone(),
db0.key_value().clone(),
));
pm0.add_notify(net.peers[0].clone());
let db1 = new_db();
let pm1 = Arc::new(Provider::new(
client1.clone(),
net.peer(1).miner.clone(),
signer.clone(),
Box::new(NoopEncryptor::default()),
validator_config,
IoChannel::to_handler(Arc::downgrade(&io_handler1)),
private_keys.clone(),
db1.key_value().clone(),
));
pm1.add_notify(net.peers[1].clone());
net.peer(0).set_private_state_db(pm0.private_state_db());
net.peer(1).set_private_state_db(pm1.private_state_db());
// Create and deploy contract
let private_contract_test = "6060604052341561000f57600080fd5b60d88061001d6000396000f30060606040526000357c0100000000000000000000000000000000000000000000000000000000900463ffffffff1680630c55699c146046578063bc64b76d14607457600080fd5b3415605057600080fd5b60566098565b60405180826000191660001916815260200191505060405180910390f35b3415607e57600080fd5b6096600480803560001916906020019091905050609e565b005b60005481565b8060008160001916905550505600a165627a7a723058206acbdf4b15ca4c2d43e1b1879b830451a34f1e9d02ff1f2f394d8d857e79d2080029".from_hex().unwrap();
let mut private_create_tx = Transaction::default();
private_create_tx.action = Action::Create;
private_create_tx.data = private_contract_test;
private_create_tx.gas = 200000.into();
let private_create_tx_signed = private_create_tx.sign(&s0.secret(), None);
let validators = vec![s1.address()];
let (public_tx, _) = pm0.public_creation_transaction(BlockId::Latest, &private_create_tx_signed, &validators, 0.into()).unwrap();
let public_tx = public_tx.sign(&s0.secret(), chain_id);
let public_tx_copy = public_tx.clone();
push_block_with_transactions(&client0, &[public_tx]);
push_block_with_transactions(&client1, &[public_tx_copy]);
net.sync();
//Create private transaction for modifying state
let mut private_tx = Transaction::default();
private_tx.action = Action::Call(address.clone());
private_tx.data = "bc64b76d2a00000000000000000000000000000000000000000000000000000000000000".from_hex().unwrap(); //setX(42)
private_tx.gas = 120000.into();
private_tx.nonce = 1.into();
let private_tx = private_tx.sign(&s0.secret(), None);
let _create_res = pm0.create_private_transaction(private_tx);
//send private transaction message to validator
net.sync();
let validator_handler = net.peer(1).private_tx_handler.clone();
let received_private_transactions = validator_handler.txs.lock().clone();
assert_eq!(received_private_transactions.len(), 1);
//process received private transaction message
let private_transaction = received_private_transactions[0].clone();
let _import_res = pm1.import_private_transaction(&private_transaction);
// Second node requests the state from the first one
net.sync();
let synced_hash = validator_handler.synced_hash.lock().clone();
assert!(pm1.private_state_synced(&synced_hash).is_ok());
// Second node has private state up-to-date and can verify the private transaction
// Further should work the standard flow
net.sync();
let sender_handler = net.peer(0).private_tx_handler.clone();
let received_signed_private_transactions = sender_handler.signed_txs.lock().clone();
assert_eq!(received_signed_private_transactions.len(), 1);
//process signed response
let signed_private_transaction = received_signed_private_transactions[0].clone();
assert!(pm0.import_signed_private_transaction(&signed_private_transaction).is_ok());
let signature: SignedPrivateTransaction = Rlp::new(&signed_private_transaction).as_val().unwrap();
assert!(pm0.process_signature(&signature).is_ok());
let local_transactions = net.peer(0).miner.local_transactions();
assert_eq!(local_transactions.len(), 1);
}

View File

@ -357,6 +357,10 @@ usage! {
"--private-tx-enabled", "--private-tx-enabled",
"Enable private transactions.", "Enable private transactions.",
FLAG flag_private_state_offchain: (bool) = false, or |c: &Config| c.private_tx.as_ref()?.state_offchain,
"--private-state-offchain",
"Store private state offchain (in the local DB).",
ARG arg_private_signer: (Option<String>) = None, or |c: &Config| c.private_tx.as_ref()?.signer.clone(), ARG arg_private_signer: (Option<String>) = None, or |c: &Config| c.private_tx.as_ref()?.signer.clone(),
"--private-signer=[ACCOUNT]", "--private-signer=[ACCOUNT]",
"Specify the account for signing public transaction created upon verified private transaction.", "Specify the account for signing public transaction created upon verified private transaction.",
@ -1200,6 +1204,7 @@ struct Account {
#[serde(deny_unknown_fields)] #[serde(deny_unknown_fields)]
struct PrivateTransactions { struct PrivateTransactions {
enabled: Option<bool>, enabled: Option<bool>,
state_offchain: Option<bool>,
signer: Option<String>, signer: Option<String>,
validators: Option<Vec<String>>, validators: Option<Vec<String>>,
account: Option<String>, account: Option<String>,
@ -1766,6 +1771,7 @@ mod tests {
// -- Private Transactions Options // -- Private Transactions Options
flag_private_enabled: true, flag_private_enabled: true,
flag_private_state_offchain: false,
arg_private_signer: Some("0xdeadbeefcafe0000000000000000000000000000".into()), arg_private_signer: Some("0xdeadbeefcafe0000000000000000000000000000".into()),
arg_private_validators: Some("0xdeadbeefcafe0000000000000000000000000000".into()), arg_private_validators: Some("0xdeadbeefcafe0000000000000000000000000000".into()),
arg_private_passwords: Some("~/.safe/password.file".into()), arg_private_passwords: Some("~/.safe/password.file".into()),

View File

@ -926,7 +926,8 @@ impl Configuration {
logs_path: match self.args.flag_private_enabled { logs_path: match self.args.flag_private_enabled {
true => Some(dirs.base), true => Some(dirs.base),
false => None, false => None,
} },
use_offchain_storage: self.args.flag_private_state_offchain,
}; };
let encryptor_conf = EncryptorConfig { let encryptor_conf = EncryptorConfig {

View File

@ -42,10 +42,18 @@ pub const TO_V12: ChangeColumns = ChangeColumns {
version: 12, version: 12,
}; };
/// The migration from v12 to v14.
/// Adds a column for private transactions state storage.
pub const TO_V14: ChangeColumns = ChangeColumns {
pre_columns: Some(8),
post_columns: Some(9),
version: 14,
};
/// Database is assumed to be at default version, when no version file is found. /// Database is assumed to be at default version, when no version file is found.
const DEFAULT_VERSION: u32 = 5; const DEFAULT_VERSION: u32 = 5;
/// Current version of database models. /// Current version of database models.
const CURRENT_VERSION: u32 = 13; const CURRENT_VERSION: u32 = 14;
/// A version of database at which blooms-db was introduced /// A version of database at which blooms-db was introduced
const BLOOMS_DB_VERSION: u32 = 13; const BLOOMS_DB_VERSION: u32 = 13;
/// Defines how many items are migrated to the new version of database at once. /// Defines how many items are migrated to the new version of database at once.
@ -147,6 +155,7 @@ fn consolidated_database_migrations(compaction_profile: &CompactionProfile) -> R
let mut manager = MigrationManager::new(default_migration_settings(compaction_profile)); let mut manager = MigrationManager::new(default_migration_settings(compaction_profile));
manager.add_migration(TO_V11).map_err(|_| Error::MigrationImpossible)?; manager.add_migration(TO_V11).map_err(|_| Error::MigrationImpossible)?;
manager.add_migration(TO_V12).map_err(|_| Error::MigrationImpossible)?; manager.add_migration(TO_V12).map_err(|_| Error::MigrationImpossible)?;
manager.add_migration(TO_V14).map_err(|_| Error::MigrationImpossible)?;
Ok(manager) Ok(manager)
} }

View File

@ -19,6 +19,7 @@ use std::sync::{Arc, mpsc};
use client_traits::BlockChainClient; use client_traits::BlockChainClient;
use sync::{self, SyncConfig, NetworkConfiguration, Params, ConnectionFilter}; use sync::{self, SyncConfig, NetworkConfiguration, Params, ConnectionFilter};
use ethcore::snapshot::SnapshotService; use ethcore::snapshot::SnapshotService;
use ethcore_private_tx::PrivateStateDB;
use light::Provider; use light::Provider;
use parity_runtime::Executor; use parity_runtime::Executor;
@ -40,6 +41,7 @@ pub fn sync(
chain: Arc<BlockChainClient>, chain: Arc<BlockChainClient>,
snapshot_service: Arc<SnapshotService>, snapshot_service: Arc<SnapshotService>,
private_tx_handler: Option<Arc<PrivateTxHandler>>, private_tx_handler: Option<Arc<PrivateTxHandler>>,
private_state: Option<Arc<PrivateStateDB>>,
provider: Arc<Provider>, provider: Arc<Provider>,
_log_settings: &LogConfig, _log_settings: &LogConfig,
connection_filter: Option<Arc<ConnectionFilter>>, connection_filter: Option<Arc<ConnectionFilter>>,
@ -51,6 +53,7 @@ pub fn sync(
provider, provider,
snapshot_service, snapshot_service,
private_tx_handler, private_tx_handler,
private_state,
network_config, network_config,
}, },
connection_filter)?; connection_filter)?;

View File

@ -626,9 +626,9 @@ fn execute_impl<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq:
.map_err(|e| format!("Stratum start error: {:?}", e))?; .map_err(|e| format!("Stratum start error: {:?}", e))?;
} }
let private_tx_sync: Option<Arc<PrivateTxHandler>> = match cmd.private_tx_enabled { let (private_tx_sync, private_state) = match cmd.private_tx_enabled {
true => Some(private_tx_service.clone() as Arc<PrivateTxHandler>), true => (Some(private_tx_service.clone() as Arc<PrivateTxHandler>), Some(private_tx_provider.private_state_db())),
false => None, false => (None, None),
}; };
// create sync object // create sync object
@ -639,6 +639,7 @@ fn execute_impl<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq:
client.clone(), client.clone(),
snapshot_service.clone(), snapshot_service.clone(),
private_tx_sync, private_tx_sync,
private_state,
client.clone(), client.clone(),
&cmd.logger_config, &cmd.logger_config,
connection_filter.clone().map(|f| f as Arc<::sync::ConnectionFilter + 'static>), connection_filter.clone().map(|f| f as Arc<::sync::ConnectionFilter + 'static>),

View File

@ -24,6 +24,10 @@ use ethcore_private_tx::{TransactionLog as EthTransactionLog, ValidatorLog as Et
pub enum Status { pub enum Status {
/// Private tx was created but no validation received yet /// Private tx was created but no validation received yet
Created, Created,
/// Private state not found locally and being retrived from peers
PrivateStateSync,
/// Retrieval of the private state failed, transaction not created
PrivateStateSyncFailed,
/// Several validators (but not all) validated the transaction /// Several validators (but not all) validated the transaction
Validating, Validating,
/// All validators validated the private tx /// All validators validated the private tx
@ -35,6 +39,8 @@ impl From<EthStatus> for Status {
fn from(c: EthStatus) -> Self { fn from(c: EthStatus) -> Self {
match c { match c {
EthStatus::Created => Status::Created, EthStatus::Created => Status::Created,
EthStatus::PrivateStateSync => Status::PrivateStateSync,
EthStatus::PrivateStateSyncFailed => Status::PrivateStateSyncFailed,
EthStatus::Validating => Status::Validating, EthStatus::Validating => Status::Validating,
EthStatus::Deployed => Status::Deployed, EthStatus::Deployed => Status::Deployed,
} }