Merge branch 'master' into client-ipc-refact
This commit is contained in:
@@ -37,10 +37,7 @@ use util::rlp::{RlpStream, Rlp, UntrustedRlp};
|
||||
use util::journaldb;
|
||||
use util::journaldb::JournalDB;
|
||||
use util::kvdb::*;
|
||||
use util::Itertools;
|
||||
use util::PerfTimer;
|
||||
use util::View;
|
||||
use util::Stream;
|
||||
use util::{Applyable, Stream, View, PerfTimer, Itertools, Colour};
|
||||
|
||||
// other
|
||||
use views::BlockView;
|
||||
@@ -63,8 +60,7 @@ use block_queue::{BlockQueue, BlockQueueInfo};
|
||||
use blockchain::{BlockChain, BlockProvider, TreeRoute, ImportRoute};
|
||||
use client::{BlockID, TransactionID, UncleID, TraceId, ClientConfig,
|
||||
DatabaseCompactionProfile, BlockChainClient, MiningBlockChainClient,
|
||||
TraceFilter, CallAnalytics, BlockImportError, TransactionImportError,
|
||||
TransactionImportResult, Mode};
|
||||
TraceFilter, CallAnalytics, BlockImportError, Mode};
|
||||
use client::Error as ClientError;
|
||||
use env_info::EnvInfo;
|
||||
use executive::{Executive, Executed, TransactOptions, contract_address};
|
||||
@@ -72,7 +68,7 @@ use receipt::LocalizedReceipt;
|
||||
use trace::{TraceDB, ImportRequest as TraceImportRequest, LocalizedTrace, Database as TraceDatabase};
|
||||
use trace;
|
||||
use evm::Factory as EvmFactory;
|
||||
use miner::{Miner, MinerService, AccountDetails};
|
||||
use miner::{Miner, MinerService};
|
||||
use util::TrieFactory;
|
||||
use ipc::IpcConfig;
|
||||
use ipc::binary::{BinaryConvertError};
|
||||
@@ -147,6 +143,7 @@ pub struct Client {
|
||||
liveness: AtomicBool,
|
||||
io_channel: IoChannel<NetSyncMessage>,
|
||||
queue_transactions: AtomicUsize,
|
||||
previous_enode: Mutex<Option<String>>,
|
||||
}
|
||||
|
||||
const HISTORY: u64 = 1200;
|
||||
@@ -232,6 +229,7 @@ impl Client {
|
||||
miner: miner,
|
||||
io_channel: message_channel,
|
||||
queue_transactions: AtomicUsize::new(0),
|
||||
previous_enode: Mutex::new(None),
|
||||
};
|
||||
Ok(Arc::new(client))
|
||||
}
|
||||
@@ -437,12 +435,8 @@ impl Client {
|
||||
pub fn import_queued_transactions(&self, transactions: &[Bytes]) -> usize {
|
||||
let _timer = PerfTimer::new("import_queued_transactions");
|
||||
self.queue_transactions.fetch_sub(transactions.len(), AtomicOrdering::SeqCst);
|
||||
let fetch_account = |a: &Address| AccountDetails {
|
||||
nonce: self.latest_nonce(a),
|
||||
balance: self.latest_balance(a),
|
||||
};
|
||||
let tx = transactions.iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect();
|
||||
let results = self.miner.import_transactions(self, tx, fetch_account);
|
||||
let txs = transactions.iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect();
|
||||
let results = self.miner.import_external_transactions(self, txs);
|
||||
results.len()
|
||||
}
|
||||
|
||||
@@ -591,6 +585,18 @@ impl Client {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Notify us that the network has been started.
|
||||
pub fn network_started(&self, url: &String) {
|
||||
let mut previous_enode = self.previous_enode.lock().unwrap();
|
||||
if let Some(ref u) = *previous_enode {
|
||||
if u == url {
|
||||
return;
|
||||
}
|
||||
}
|
||||
*previous_enode = Some(url.clone());
|
||||
info!(target: "mode", "Public node URL: {}", url.apply(Colour::White.bold()));
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Ipc)]
|
||||
@@ -891,18 +897,6 @@ impl BlockChainClient for Client {
|
||||
self.build_last_hashes(self.chain.best_block_hash())
|
||||
}
|
||||
|
||||
fn import_transactions(&self, transactions: Vec<SignedTransaction>) -> Vec<Result<TransactionImportResult, TransactionImportError>> {
|
||||
let fetch_account = |a: &Address| AccountDetails {
|
||||
nonce: self.latest_nonce(a),
|
||||
balance: self.latest_balance(a),
|
||||
};
|
||||
|
||||
self.miner.import_transactions(self, transactions, &fetch_account)
|
||||
.into_iter()
|
||||
.map(|res| res.map_err(|e| e.into()))
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn queue_transactions(&self, transactions: Vec<Bytes>) {
|
||||
if self.queue_transactions.load(AtomicOrdering::Relaxed) > MAX_TX_QUEUE_SIZE {
|
||||
debug!("Ignoring {} transactions: queue is full", transactions.len());
|
||||
|
||||
@@ -48,7 +48,8 @@ use trace::LocalizedTrace;
|
||||
use evm::Factory as EvmFactory;
|
||||
pub use types::call_analytics::CallAnalytics;
|
||||
pub use block_import_error::BlockImportError;
|
||||
pub use transaction_import::{TransactionImportResult, TransactionImportError};
|
||||
pub use transaction_import::TransactionImportResult;
|
||||
pub use transaction_import::TransactionImportError;
|
||||
|
||||
mod client {
|
||||
//! Blockchain database client.
|
||||
@@ -188,9 +189,6 @@ pub trait BlockChainClient : Sync + Send {
|
||||
/// Get last hashes starting from best block.
|
||||
fn last_hashes(&self) -> LastHashes;
|
||||
|
||||
/// import transactions from network/other 3rd party
|
||||
fn import_transactions(&self, transactions: Vec<SignedTransaction>) -> Vec<Result<TransactionImportResult, TransactionImportError>>;
|
||||
|
||||
/// Queue transactions for importing.
|
||||
fn queue_transactions(&self, transactions: Vec<Bytes>);
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@ use transaction::{Transaction, LocalizedTransaction, SignedTransaction, Action};
|
||||
use blockchain::TreeRoute;
|
||||
use client::{BlockChainClient, MiningBlockChainClient, BlockChainInfo, BlockStatus, BlockID,
|
||||
TransactionID, UncleID, TraceId, TraceFilter, LastHashes, CallAnalytics,
|
||||
TransactionImportError, BlockImportError};
|
||||
BlockImportError};
|
||||
use header::{Header as BlockHeader, BlockNumber};
|
||||
use filter::Filter;
|
||||
use log_entry::LocalizedLogEntry;
|
||||
@@ -39,8 +39,6 @@ use executive::Executed;
|
||||
use error::ExecutionError;
|
||||
use trace::LocalizedTrace;
|
||||
|
||||
use miner::{TransactionImportResult, AccountDetails};
|
||||
|
||||
/// Test client.
|
||||
pub struct TestBlockChainClient {
|
||||
/// Blocks.
|
||||
@@ -275,6 +273,10 @@ impl BlockChainClient for TestBlockChainClient {
|
||||
}
|
||||
}
|
||||
|
||||
fn latest_nonce(&self, address: &Address) -> U256 {
|
||||
self.nonce(address, BlockID::Latest).unwrap()
|
||||
}
|
||||
|
||||
fn code(&self, address: &Address) -> Option<Bytes> {
|
||||
self.code.read().unwrap().get(address).cloned()
|
||||
}
|
||||
@@ -287,6 +289,10 @@ impl BlockChainClient for TestBlockChainClient {
|
||||
}
|
||||
}
|
||||
|
||||
fn latest_balance(&self, address: &Address) -> U256 {
|
||||
self.balance(address, BlockID::Latest).unwrap()
|
||||
}
|
||||
|
||||
fn storage_at(&self, address: &Address, position: &H256, id: BlockID) -> Option<H256> {
|
||||
if let BlockID::Latest = id {
|
||||
Some(self.storage.read().unwrap().get(&(address.clone(), position.clone())).cloned().unwrap_or_else(H256::new))
|
||||
@@ -488,24 +494,10 @@ impl BlockChainClient for TestBlockChainClient {
|
||||
unimplemented!();
|
||||
}
|
||||
|
||||
fn import_transactions(&self, transactions: Vec<SignedTransaction>) -> Vec<Result<TransactionImportResult, TransactionImportError>> {
|
||||
let nonces = self.nonces.read().unwrap();
|
||||
let balances = self.balances.read().unwrap();
|
||||
let fetch_account = |a: &Address| AccountDetails {
|
||||
nonce: nonces[a],
|
||||
balance: balances[a],
|
||||
};
|
||||
|
||||
self.miner.import_transactions(self, transactions, &fetch_account)
|
||||
.into_iter()
|
||||
.map(|res| res.map_err(|e| e.into()))
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn queue_transactions(&self, transactions: Vec<Bytes>) {
|
||||
// import right here
|
||||
let tx = transactions.into_iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect();
|
||||
self.import_transactions(tx);
|
||||
let txs = transactions.into_iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect();
|
||||
self.miner.import_external_transactions(self, txs);
|
||||
}
|
||||
|
||||
fn pending_transactions(&self) -> Vec<SignedTransaction> {
|
||||
|
||||
@@ -20,7 +20,6 @@ use std::time::{Instant, Duration};
|
||||
|
||||
use util::*;
|
||||
use util::using_queue::{UsingQueue, GetAction};
|
||||
use util::Colour::White;
|
||||
use account_provider::AccountProvider;
|
||||
use views::{BlockView, HeaderView};
|
||||
use client::{MiningBlockChainClient, Executive, Executed, EnvInfo, TransactOptions, BlockID, CallAnalytics};
|
||||
@@ -316,6 +315,19 @@ impl Miner {
|
||||
!have_work
|
||||
}
|
||||
|
||||
fn add_transactions_to_queue(&self, chain: &MiningBlockChainClient, transactions: Vec<SignedTransaction>, origin: TransactionOrigin, transaction_queue: &mut TransactionQueue) ->
|
||||
Vec<Result<TransactionImportResult, Error>> {
|
||||
|
||||
let fetch_account = |a: &Address| AccountDetails {
|
||||
nonce: chain.latest_nonce(a),
|
||||
balance: chain.latest_balance(a),
|
||||
};
|
||||
|
||||
transactions.into_iter()
|
||||
.map(|tx| transaction_queue.add(tx, &fetch_account, origin))
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Are we allowed to do a non-mandatory reseal?
|
||||
fn tx_reseal_allowed(&self) -> bool { Instant::now() > *self.next_allowed_reseal.lock().unwrap() }
|
||||
}
|
||||
@@ -478,27 +490,24 @@ impl MinerService for Miner {
|
||||
self.gas_range_target.read().unwrap().1
|
||||
}
|
||||
|
||||
fn import_transactions<T>(&self, chain: &MiningBlockChainClient, transactions: Vec<SignedTransaction>, fetch_account: T) ->
|
||||
Vec<Result<TransactionImportResult, Error>>
|
||||
where T: Fn(&Address) -> AccountDetails {
|
||||
let results: Vec<Result<TransactionImportResult, Error>> = {
|
||||
let mut transaction_queue = self.transaction_queue.lock().unwrap();
|
||||
transactions.into_iter()
|
||||
.map(|tx| transaction_queue.add(tx, &fetch_account, TransactionOrigin::External))
|
||||
.collect()
|
||||
};
|
||||
if !results.is_empty() && self.options.reseal_on_external_tx && self.tx_reseal_allowed() {
|
||||
fn import_external_transactions(&self, chain: &MiningBlockChainClient, transactions: Vec<SignedTransaction>) ->
|
||||
Vec<Result<TransactionImportResult, Error>> {
|
||||
|
||||
let mut transaction_queue = self.transaction_queue.lock().unwrap();
|
||||
let results = self.add_transactions_to_queue(chain, transactions, TransactionOrigin::External,
|
||||
&mut transaction_queue);
|
||||
|
||||
if !results.is_empty() && self.options.reseal_on_external_tx && self.tx_reseal_allowed() {
|
||||
self.update_sealing(chain);
|
||||
}
|
||||
results
|
||||
}
|
||||
|
||||
fn import_own_transaction<T>(
|
||||
fn import_own_transaction(
|
||||
&self,
|
||||
chain: &MiningBlockChainClient,
|
||||
transaction: SignedTransaction,
|
||||
fetch_account: T
|
||||
) -> Result<TransactionImportResult, Error> where T: Fn(&Address) -> AccountDetails {
|
||||
) -> Result<TransactionImportResult, Error> {
|
||||
|
||||
let hash = transaction.hash();
|
||||
trace!(target: "own_tx", "Importing transaction: {:?}", transaction);
|
||||
@@ -506,7 +515,7 @@ impl MinerService for Miner {
|
||||
let imported = {
|
||||
// Be sure to release the lock before we call enable_and_prepare_sealing
|
||||
let mut transaction_queue = self.transaction_queue.lock().unwrap();
|
||||
let import = transaction_queue.add(transaction, &fetch_account, TransactionOrigin::Local);
|
||||
let import = self.add_transactions_to_queue(chain, vec![transaction], TransactionOrigin::Local, &mut transaction_queue).pop().unwrap();
|
||||
|
||||
match import {
|
||||
Ok(ref res) => {
|
||||
@@ -645,7 +654,7 @@ impl MinerService for Miner {
|
||||
let n = sealed.header().number();
|
||||
let h = sealed.header().hash();
|
||||
try!(chain.import_sealed_block(sealed));
|
||||
info!(target: "miner", "Mined block imported OK. #{}: {}", paint(White.bold(), format!("{}", n)), paint(White.bold(), h.hex()));
|
||||
info!(target: "miner", "Mined block imported OK. #{}: {}", format!("{}", n).apply(Colour::White.bold()), h.hex().apply(Colour::White.bold()));
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
@@ -657,7 +666,12 @@ impl MinerService for Miner {
|
||||
// Client should send message after commit to db and inserting to chain.
|
||||
.expect("Expected in-chain blocks.");
|
||||
let block = BlockView::new(&block);
|
||||
block.transactions()
|
||||
let txs = block.transactions();
|
||||
// populate sender
|
||||
for tx in &txs {
|
||||
let _sender = tx.sender();
|
||||
}
|
||||
txs
|
||||
}
|
||||
|
||||
// 1. We ignore blocks that were `imported` (because it means that they are not in canon-chain, and transactions
|
||||
@@ -674,14 +688,10 @@ impl MinerService for Miner {
|
||||
.par_iter()
|
||||
.map(|h| fetch_transactions(chain, h));
|
||||
out_of_chain.for_each(|txs| {
|
||||
// populate sender
|
||||
for tx in &txs {
|
||||
let _sender = tx.sender();
|
||||
}
|
||||
let _ = self.import_transactions(chain, txs, |a| AccountDetails {
|
||||
nonce: chain.latest_nonce(a),
|
||||
balance: chain.latest_balance(a),
|
||||
});
|
||||
let mut transaction_queue = self.transaction_queue.lock().unwrap();
|
||||
let _ = self.add_transactions_to_queue(
|
||||
chain, txs, TransactionOrigin::External, &mut transaction_queue
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -107,14 +107,12 @@ pub trait MinerService : Send + Sync {
|
||||
fn set_tx_gas_limit(&self, limit: U256);
|
||||
|
||||
/// Imports transactions to transaction queue.
|
||||
fn import_transactions<T>(&self, chain: &MiningBlockChainClient, transactions: Vec<SignedTransaction>, fetch_account: T) ->
|
||||
Vec<Result<TransactionImportResult, Error>>
|
||||
where T: Fn(&Address) -> AccountDetails, Self: Sized;
|
||||
fn import_external_transactions(&self, chain: &MiningBlockChainClient, transactions: Vec<SignedTransaction>) ->
|
||||
Vec<Result<TransactionImportResult, Error>>;
|
||||
|
||||
/// Imports own (node owner) transaction to queue.
|
||||
fn import_own_transaction<T>(&self, chain: &MiningBlockChainClient, transaction: SignedTransaction, fetch_account: T) ->
|
||||
Result<TransactionImportResult, Error>
|
||||
where T: Fn(&Address) -> AccountDetails, Self: Sized;
|
||||
fn import_own_transaction(&self, chain: &MiningBlockChainClient, transaction: SignedTransaction) ->
|
||||
Result<TransactionImportResult, Error>;
|
||||
|
||||
/// Returns hashes of transactions currently in pending
|
||||
fn pending_transactions_hashes(&self) -> Vec<H256>;
|
||||
|
||||
@@ -17,7 +17,6 @@
|
||||
//! Creates and registers client and network services.
|
||||
|
||||
use util::*;
|
||||
use util::Colour::{Yellow, White};
|
||||
use util::panics::*;
|
||||
use spec::Spec;
|
||||
use error::*;
|
||||
@@ -72,7 +71,7 @@ impl ClientService {
|
||||
try!(net_service.start());
|
||||
}
|
||||
|
||||
info!("Configured for {} using {} engine", paint(White.bold(), spec.name.clone()), paint(Yellow.bold(), spec.engine.name().to_owned()));
|
||||
info!("Configured for {} using {} engine", spec.name.clone().apply(Colour::White.bold()), spec.engine.name().apply(Colour::Yellow.bold()));
|
||||
let client = try!(Client::new(config, spec, db_path, miner, net_service.io().channel()));
|
||||
panic_handler.forward_from(client.deref());
|
||||
let client_io = Arc::new(ClientIoHandler {
|
||||
@@ -135,16 +134,14 @@ impl IoHandler<NetSyncMessage> for ClientIoHandler {
|
||||
|
||||
#[cfg_attr(feature="dev", allow(single_match))]
|
||||
fn message(&self, io: &IoContext<NetSyncMessage>, net_message: &NetSyncMessage) {
|
||||
if let UserMessage(ref message) = *net_message {
|
||||
match *message {
|
||||
SyncMessage::BlockVerified => {
|
||||
self.client.import_verified_blocks(&io.channel());
|
||||
},
|
||||
SyncMessage::NewTransactions(ref transactions) => {
|
||||
self.client.import_queued_transactions(&transactions);
|
||||
},
|
||||
_ => {}, // ignore other messages
|
||||
}
|
||||
match *net_message {
|
||||
UserMessage(ref message) => match *message {
|
||||
SyncMessage::BlockVerified => { self.client.import_verified_blocks(&io.channel()); }
|
||||
SyncMessage::NewTransactions(ref transactions) => { self.client.import_queued_transactions(&transactions); }
|
||||
_ => {} // ignore other messages
|
||||
},
|
||||
NetworkIoMessage::NetworkStarted(ref url) => { self.client.network_started(url); }
|
||||
_ => {} // ignore other messages
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user