Merge pull request #4501 from ethcore/light-txq

Light Client transaction queue, initial LightDispatcher
This commit is contained in:
Robert Habermeier 2017-02-15 14:06:31 +01:00 committed by GitHub
commit 36ea5550ba
13 changed files with 805 additions and 85 deletions

View File

@ -16,24 +16,22 @@
//! Light client implementation. Stores data from light sync
use std::sync::Arc;
use ethcore::block_import_error::BlockImportError;
use ethcore::block_status::BlockStatus;
use ethcore::client::ClientReport;
use ethcore::engines::Engine;
use ethcore::ids::BlockId;
use ethcore::header::Header;
use ethcore::verification::queue::{self, HeaderQueue};
use ethcore::transaction::{PendingTransaction, Condition as TransactionCondition};
use ethcore::blockchain_info::BlockChainInfo;
use ethcore::spec::Spec;
use ethcore::service::ClientIoMessage;
use ethcore::encoded;
use io::IoChannel;
use util::hash::{H256, H256FastMap};
use util::{Bytes, Mutex, RwLock};
use provider::Provider;
use request;
use util::{Bytes, H256, Mutex, RwLock};
use self::header_chain::HeaderChain;
@ -58,6 +56,12 @@ pub trait LightChainClient: Send + Sync {
/// parent queued prior.
fn queue_header(&self, header: Header) -> Result<H256, BlockImportError>;
/// Attempt to get block header by block id.
fn block_header(&self, id: BlockId) -> Option<encoded::Header>;
/// Get the best block header.
fn best_block_header(&self) -> encoded::Header;
/// Query whether a block is known.
fn is_known(&self, hash: &H256) -> bool;
@ -74,11 +78,26 @@ pub trait LightChainClient: Send + Sync {
fn cht_root(&self, i: usize) -> Option<H256>;
}
/// Something which can be treated as a `LightChainClient`.
pub trait AsLightClient {
/// The kind of light client this can be treated as.
type Client: LightChainClient;
/// Access the underlying light client.
fn as_light_client(&self) -> &Self::Client;
}
impl<T: LightChainClient> AsLightClient for T {
type Client = Self;
fn as_light_client(&self) -> &Self { self }
}
/// Light client implementation.
pub struct Client {
queue: HeaderQueue,
engine: Arc<Engine>,
chain: HeaderChain,
tx_pool: Mutex<H256FastMap<PendingTransaction>>,
report: RwLock<ClientReport>,
import_lock: Mutex<()>,
}
@ -88,8 +107,8 @@ impl Client {
pub fn new(config: Config, spec: &Spec, io_channel: IoChannel<ClientIoMessage>) -> Self {
Client {
queue: HeaderQueue::new(config.queue, spec.engine.clone(), io_channel, true),
engine: spec.engine.clone(),
chain: HeaderChain::new(&::rlp::encode(&spec.genesis_header())),
tx_pool: Mutex::new(Default::default()),
report: RwLock::new(ClientReport::default()),
import_lock: Mutex::new(()),
}
@ -100,25 +119,6 @@ impl Client {
self.queue.import(header).map_err(Into::into)
}
/// Import a local transaction.
pub fn import_own_transaction(&self, tx: PendingTransaction) {
self.tx_pool.lock().insert(tx.transaction.hash(), tx);
}
/// Fetch a vector of all pending transactions.
pub fn ready_transactions(&self) -> Vec<PendingTransaction> {
let best = self.chain.best_header();
self.tx_pool.lock()
.values()
.filter(|t| match t.condition {
Some(TransactionCondition::Number(x)) => x <= best.number(),
Some(TransactionCondition::Timestamp(x)) => x <= best.timestamp(),
None => true,
})
.cloned()
.collect()
}
/// Inquire about the status of a given header.
pub fn status(&self, hash: &H256) -> BlockStatus {
match self.queue.status(hash) {
@ -159,6 +159,11 @@ impl Client {
self.chain.block_header(id)
}
/// Get the best block header.
pub fn best_block_header(&self) -> encoded::Header {
self.chain.best_header()
}
/// Flush the header queue.
pub fn flush_queue(&self) {
self.queue.flush()
@ -207,6 +212,11 @@ impl Client {
self.chain.heap_size_of_children()
}
/// Get a handle to the verification engine.
pub fn engine(&self) -> &Engine {
&*self.engine
}
}
impl LightChainClient for Client {
@ -216,6 +226,14 @@ impl LightChainClient for Client {
self.import_header(header)
}
fn block_header(&self, id: BlockId) -> Option<encoded::Header> {
Client::block_header(self, id)
}
fn best_block_header(&self) -> encoded::Header {
Client::best_block_header(self)
}
fn is_known(&self, hash: &H256) -> bool {
self.status(hash) == BlockStatus::InChain
}
@ -237,8 +255,8 @@ impl LightChainClient for Client {
}
}
// dummy implementation -- may draw from canonical cache further on.
impl Provider for Client {
// dummy implementation, should be removed when a `TestClient` is added.
impl ::provider::Provider for Client {
fn chain_info(&self) -> BlockChainInfo {
Client::chain_info(self)
}
@ -263,19 +281,19 @@ impl Provider for Client {
None
}
fn state_proof(&self, _req: request::StateProof) -> Vec<Bytes> {
fn state_proof(&self, _req: ::request::StateProof) -> Vec<Bytes> {
Vec::new()
}
fn contract_code(&self, _req: request::ContractCode) -> Bytes {
fn contract_code(&self, _req: ::request::ContractCode) -> Bytes {
Vec::new()
}
fn header_proof(&self, _req: request::HeaderProof) -> Option<(encoded::Header, Vec<Bytes>)> {
fn header_proof(&self, _req: ::request::HeaderProof) -> Option<(encoded::Header, Vec<Bytes>)> {
None
}
fn ready_transactions(&self) -> Vec<PendingTransaction> {
fn ready_transactions(&self) -> Vec<::ethcore::transaction::PendingTransaction> {
Vec::new()
}
}

View File

@ -36,6 +36,7 @@ pub mod client;
pub mod cht;
pub mod net;
pub mod on_demand;
pub mod transaction_queue;
#[cfg(not(feature = "ipc"))]
pub mod provider;
@ -54,6 +55,7 @@ pub mod remote {
mod types;
pub use self::provider::Provider;
pub use self::transaction_queue::TransactionQueue;
pub use types::les_request as request;
#[macro_use]

View File

@ -17,15 +17,19 @@
//! A provider for the LES protocol. This is typically a full node, who can
//! give as much data as necessary to its peers.
use std::sync::Arc;
use ethcore::blockchain_info::BlockChainInfo;
use ethcore::client::{BlockChainClient, ProvingBlockChainClient};
use ethcore::transaction::PendingTransaction;
use ethcore::ids::BlockId;
use ethcore::encoded;
use util::{Bytes, RwLock, H256};
use cht::{self, BlockInfo};
use client::{LightChainClient, AsLightClient};
use transaction_queue::TransactionQueue;
use util::{Bytes, H256};
use request;
@ -284,6 +288,75 @@ impl<T: ProvingBlockChainClient + ?Sized> Provider for T {
}
}
/// The light client "provider" implementation. This wraps a `LightClient` and
/// a light transaction queue.
pub struct LightProvider<L> {
client: Arc<L>,
txqueue: Arc<RwLock<TransactionQueue>>,
}
impl<L> LightProvider<L> {
/// Create a new `LightProvider` from the given client and transaction queue.
pub fn new(client: Arc<L>, txqueue: Arc<RwLock<TransactionQueue>>) -> Self {
LightProvider {
client: client,
txqueue: txqueue,
}
}
}
// TODO: draw from cache (shared between this and the RPC layer)
impl<L: AsLightClient + Send + Sync> Provider for LightProvider<L> {
fn chain_info(&self) -> BlockChainInfo {
self.client.as_light_client().chain_info()
}
fn reorg_depth(&self, _a: &H256, _b: &H256) -> Option<u64> {
None
}
fn earliest_state(&self) -> Option<u64> {
None
}
fn block_header(&self, id: BlockId) -> Option<encoded::Header> {
self.client.as_light_client().block_header(id)
}
fn block_body(&self, _id: BlockId) -> Option<encoded::Body> {
None
}
fn block_receipts(&self, _hash: &H256) -> Option<Bytes> {
None
}
fn state_proof(&self, _req: request::StateProof) -> Vec<Bytes> {
Vec::new()
}
fn contract_code(&self, _req: request::ContractCode) -> Bytes {
Vec::new()
}
fn header_proof(&self, _req: request::HeaderProof) -> Option<(encoded::Header, Vec<Bytes>)> {
None
}
fn ready_transactions(&self) -> Vec<PendingTransaction> {
let chain_info = self.chain_info();
self.txqueue.read().ready_transactions(chain_info.best_block_number, chain_info.best_block_timestamp)
}
}
impl<L: AsLightClient> AsLightClient for LightProvider<L> {
type Client = L::Client;
fn as_light_client(&self) -> &L::Client {
self.client.as_light_client()
}
}
#[cfg(test)]
mod tests {
use ethcore::client::{EachBlockWith, TestBlockChainClient};

View File

@ -0,0 +1,474 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Light Transaction Queue.
//!
//! Manages local transactions,
//! but stores all local transactions, removing only on invalidated nonce.
//!
//! Under the assumption that light nodes will have a relatively limited set of
//! accounts for which they create transactions, this queue is structured in an
//! address-wise manner.
use std::collections::{BTreeMap, HashMap};
use std::collections::hash_map::Entry;
use ethcore::error::TransactionError;
use ethcore::transaction::{Condition, PendingTransaction, SignedTransaction};
use ethcore::transaction_import::TransactionImportResult;
use util::{Address, U256, H256, H256FastMap};
// Knowledge of an account's current nonce.
#[derive(Debug, Clone, PartialEq, Eq)]
enum CurrentNonce {
// Assumed current nonce.
Assumed(U256),
// Known current nonce.
Known(U256),
}
impl CurrentNonce {
// whether this nonce is assumed
fn is_assumed(&self) -> bool {
match *self {
CurrentNonce::Assumed(_) => true,
CurrentNonce::Known(_) => false,
}
}
// whether this nonce is known for certain from an external source.
fn is_known(&self) -> bool {
!self.is_assumed()
}
// the current nonce's value.
fn value(&self) -> &U256 {
match *self {
CurrentNonce::Assumed(ref val) => val,
CurrentNonce::Known(ref val) => val,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct TransactionInfo {
hash: H256,
nonce: U256,
condition: Option<Condition>,
}
impl<'a> From<&'a PendingTransaction> for TransactionInfo {
fn from(tx: &'a PendingTransaction) -> Self {
TransactionInfo {
hash: tx.hash(),
nonce: tx.nonce.clone(),
condition: tx.condition.clone(),
}
}
}
// transactions associated with a specific account.
#[derive(Debug, Clone, PartialEq, Eq)]
struct AccountTransactions {
// believed current nonce (gotten from initial given TX or `cull` calls).
cur_nonce: CurrentNonce,
current: Vec<TransactionInfo>, // ordered "current" transactions (cur_nonce onwards)
future: BTreeMap<U256, TransactionInfo>, // "future" transactions.
}
impl AccountTransactions {
fn is_empty(&self) -> bool {
self.current.is_empty() && self.future.is_empty()
}
fn next_nonce(&self) -> U256 {
self.current.last().map(|last| last.nonce + 1.into())
.unwrap_or_else(|| *self.cur_nonce.value())
}
// attempt to move transactions from the future queue into the current queue.
fn adjust_future(&mut self) {
let mut next_nonce = self.next_nonce();
loop {
match self.future.remove(&next_nonce) {
Some(tx) => self.current.push(tx),
None => break,
}
next_nonce = next_nonce + 1.into();
}
}
}
/// Light transaction queue. See module docs for more details.
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct TransactionQueue {
by_account: HashMap<Address, AccountTransactions>,
by_hash: H256FastMap<PendingTransaction>,
}
impl TransactionQueue {
/// Import a pending transaction to be queued.
pub fn import(&mut self, tx: PendingTransaction) -> Result<TransactionImportResult, TransactionError> {
let sender = tx.sender();
let hash = tx.hash();
let nonce = tx.nonce;
let tx_info = TransactionInfo::from(&tx);
if self.by_hash.contains_key(&hash) { return Err(TransactionError::AlreadyImported) }
let res = match self.by_account.entry(sender) {
Entry::Vacant(entry) => {
entry.insert(AccountTransactions {
cur_nonce: CurrentNonce::Assumed(nonce),
current: vec![tx_info],
future: BTreeMap::new(),
});
TransactionImportResult::Current
}
Entry::Occupied(mut entry) => {
let acct_txs = entry.get_mut();
if &nonce < acct_txs.cur_nonce.value() {
// don't accept txs from before known current nonce.
if acct_txs.cur_nonce.is_known() {
return Err(TransactionError::Old)
}
// lower our assumption until corrected later.
acct_txs.cur_nonce = CurrentNonce::Assumed(nonce);
}
match acct_txs.current.binary_search_by(|x| x.nonce.cmp(&nonce)) {
Ok(idx) => {
trace!(target: "txqueue", "Replacing existing transaction from {} with nonce {}",
sender, nonce);
let old = ::std::mem::replace(&mut acct_txs.current[idx], tx_info);
self.by_hash.remove(&old.hash);
TransactionImportResult::Current
}
Err(idx) => {
let cur_len = acct_txs.current.len();
let incr_nonce = nonce + 1.into();
// current is sorted with one tx per nonce,
// so if a tx with given nonce wasn't found that means it is either
// earlier in nonce than all other "current" transactions or later.
assert!(idx == 0 || idx == cur_len);
if idx == 0 && acct_txs.current.first().map_or(false, |f| f.nonce != incr_nonce) {
let old_cur = ::std::mem::replace(&mut acct_txs.current, vec![tx_info]);
trace!(target: "txqueue", "Moving {} transactions with nonce > {} to future",
old_cur.len(), incr_nonce);
for future in old_cur {
let future_nonce = future.nonce;
acct_txs.future.insert(future_nonce, future);
}
TransactionImportResult::Current
} else if idx == cur_len && acct_txs.current.last().map_or(false, |f| f.nonce + 1.into() != nonce) {
trace!(target: "txqueue", "Queued future transaction for {}, nonce={}", sender, nonce);
let future_nonce = nonce;
acct_txs.future.insert(future_nonce, tx_info);
TransactionImportResult::Future
} else {
trace!(target: "txqueue", "Queued current transaction for {}, nonce={}", sender, nonce);
// insert, then check if we've filled any gaps.
acct_txs.current.insert(idx, tx_info);
acct_txs.adjust_future();
TransactionImportResult::Current
}
}
}
}
};
self.by_hash.insert(hash, tx);
Ok(res)
}
/// Get pending transaction by hash.
pub fn transaction(&self, hash: &H256) -> Option<SignedTransaction> {
self.by_hash.get(hash).map(|tx| (&**tx).clone())
}
/// Get the next nonce for a given address based on what's within the queue.
/// If the address has no queued transactions, then `None` will be returned
/// and the next nonce will have to be deduced via other means.
pub fn next_nonce(&self, address: &Address) -> Option<U256> {
self.by_account.get(address).map(AccountTransactions::next_nonce)
}
/// Get all transactions ready to be propagated.
/// `best_block_number` and `best_block_timestamp` are used to filter out conditionally
/// propagated transactions.
///
/// Returned transactions are batched by sender, in order of ascending nonce.
pub fn ready_transactions(&self, best_block_number: u64, best_block_timestamp: u64) -> Vec<PendingTransaction> {
self.by_account.values()
.flat_map(|acct_txs| {
acct_txs.current.iter().take_while(|tx| match tx.condition {
None => true,
Some(Condition::Number(blk_num)) => blk_num <= best_block_number,
Some(Condition::Timestamp(time)) => time <= best_block_timestamp,
}).map(|info| info.hash)
})
.filter_map(|hash| match self.by_hash.get(&hash) {
Some(tx) => Some(tx.clone()),
None => {
warn!(target: "txqueue", "Inconsistency detected between `by_hash` and `by_account`: {} not stored.",
hash);
None
}
})
.collect()
}
/// Addresses for which we store transactions.
pub fn queued_senders(&self) -> Vec<Address> {
self.by_account.keys().cloned().collect()
}
/// Cull out all transactions by the given address which are invalidated by the given nonce.
pub fn cull(&mut self, address: Address, cur_nonce: U256) {
let mut removed_hashes = vec![];
if let Entry::Occupied(mut entry) = self.by_account.entry(address) {
{
let acct_txs = entry.get_mut();
acct_txs.cur_nonce = CurrentNonce::Known(cur_nonce);
// cull old "future" keys.
let old_future: Vec<_> = acct_txs.future.keys().take_while(|&&k| k < cur_nonce).cloned().collect();
for old in old_future {
let hash = acct_txs.future.remove(&old)
.expect("key extracted from keys iterator; known to exist; qed")
.hash;
removed_hashes.push(hash);
}
// then cull from "current".
let valid_pos = acct_txs.current.iter().position(|tx| tx.nonce >= cur_nonce);
match valid_pos {
None =>
removed_hashes.extend(acct_txs.current.drain(..).map(|tx| tx.hash)),
Some(valid) =>
removed_hashes.extend(acct_txs.current.drain(..valid).map(|tx| tx.hash)),
}
// now try and move stuff out of future into current.
acct_txs.adjust_future();
}
if entry.get_mut().is_empty() {
trace!(target: "txqueue", "No more queued transactions for {} after nonce {}",
address, cur_nonce);
entry.remove();
}
}
trace!(target: "txqueue", "Culled {} old transactions from sender {} (nonce={})",
removed_hashes.len(), address, cur_nonce);
for hash in removed_hashes {
self.by_hash.remove(&hash);
}
}
}
#[cfg(test)]
mod tests {
use super::TransactionQueue;
use util::Address;
use ethcore::transaction::{Transaction, PendingTransaction, Condition};
#[test]
fn queued_senders() {
let sender = Address::default();
let mut txq = TransactionQueue::default();
let tx = Transaction::default().fake_sign(sender);
txq.import(tx.into()).unwrap();
assert_eq!(txq.queued_senders(), vec![sender]);
txq.cull(sender, 1.into());
assert_eq!(txq.queued_senders(), vec![]);
assert!(txq.by_hash.is_empty());
}
#[test]
fn next_nonce() {
let sender = Address::default();
let mut txq = TransactionQueue::default();
for i in (0..5).chain(10..15) {
let mut tx = Transaction::default();
tx.nonce = i.into();
let tx = tx.fake_sign(sender);
txq.import(tx.into()).unwrap();
}
// current: 0..5, future: 10..15
assert_eq!(txq.ready_transactions(0, 0).len(), 5);
assert_eq!(txq.next_nonce(&sender).unwrap(), 5.into());
txq.cull(sender, 8.into());
// current: empty, future: 10..15
assert_eq!(txq.ready_transactions(0, 0).len(), 0);
assert_eq!(txq.next_nonce(&sender).unwrap(), 8.into());
txq.cull(sender, 10.into());
// current: 10..15, future: empty
assert_eq!(txq.ready_transactions(0, 0).len(), 5);
assert_eq!(txq.next_nonce(&sender).unwrap(), 15.into());
}
#[test]
fn current_to_future() {
let sender = Address::default();
let mut txq = TransactionQueue::default();
for i in 5..10 {
let mut tx = Transaction::default();
tx.nonce = i.into();
let tx = tx.fake_sign(sender);
txq.import(tx.into()).unwrap();
}
assert_eq!(txq.ready_transactions(0, 0).len(), 5);
assert_eq!(txq.next_nonce(&sender).unwrap(), 10.into());
for i in 0..3 {
let mut tx = Transaction::default();
tx.nonce = i.into();
let tx = tx.fake_sign(sender);
txq.import(tx.into()).unwrap();
}
assert_eq!(txq.ready_transactions(0, 0).len(), 3);
assert_eq!(txq.next_nonce(&sender).unwrap(), 3.into());
for i in 3..5 {
let mut tx = Transaction::default();
tx.nonce = i.into();
let tx = tx.fake_sign(sender);
txq.import(tx.into()).unwrap();
}
assert_eq!(txq.ready_transactions(0, 0).len(), 10);
assert_eq!(txq.next_nonce(&sender).unwrap(), 10.into());
}
#[test]
fn conditional() {
let mut txq = TransactionQueue::default();
let sender = Address::default();
for i in 0..5 {
let mut tx = Transaction::default();
tx.nonce = i.into();
let tx = tx.fake_sign(sender);
txq.import(match i {
3 => PendingTransaction::new(tx, Some(Condition::Number(100))),
4 => PendingTransaction::new(tx, Some(Condition::Timestamp(1234))),
_ => tx.into(),
}).unwrap();
}
assert_eq!(txq.ready_transactions(0, 0).len(), 3);
assert_eq!(txq.ready_transactions(0, 1234).len(), 3);
assert_eq!(txq.ready_transactions(100, 0).len(), 4);
assert_eq!(txq.ready_transactions(100, 1234).len(), 5);
}
#[test]
fn cull_from_future() {
let sender = Address::default();
let mut txq = TransactionQueue::default();
for i in (0..1).chain(3..10) {
let mut tx = Transaction::default();
tx.nonce = i.into();
let tx = tx.fake_sign(sender);
txq.import(tx.into()).unwrap();
}
txq.cull(sender, 6.into());
assert_eq!(txq.ready_transactions(0, 0).len(), 4);
assert_eq!(txq.next_nonce(&sender).unwrap(), 10.into());
}
#[test]
fn import_old() {
let sender = Address::default();
let mut txq = TransactionQueue::default();
let mut tx_a = Transaction::default();
tx_a.nonce = 3.into();
let mut tx_b = Transaction::default();
tx_b.nonce = 2.into();
txq.import(tx_a.fake_sign(sender).into()).unwrap();
txq.cull(sender, 3.into());
assert!(txq.import(tx_b.fake_sign(sender).into()).is_err())
}
#[test]
fn replace_is_removed() {
let sender = Address::default();
let mut txq = TransactionQueue::default();
let tx_b: PendingTransaction = Transaction::default().fake_sign(sender).into();
let tx_a: PendingTransaction = {
let mut tx_a = Transaction::default();
tx_a.gas_price = tx_b.gas_price + 1.into();
tx_a.fake_sign(sender).into()
};
let hash = tx_a.hash();
txq.import(tx_a).unwrap();
txq.import(tx_b).unwrap();
assert!(txq.transaction(&hash).is_none());
}
}

View File

@ -18,14 +18,18 @@
use std::fmt::Debug;
use std::ops::Deref;
use std::sync::Weak;
use std::sync::{Arc, Weak};
use futures::{future, Future, BoxFuture};
use light::client::LightChainClient;
use light::on_demand::{request, OnDemand};
use light::TransactionQueue as LightTransactionQueue;
use rlp::{self, Stream};
use util::{Address, H520, H256, U256, Uint, Bytes};
use util::{Address, H520, H256, U256, Uint, Bytes, RwLock};
use util::sha3::Hashable;
use ethkey::Signature;
use ethsync::LightSync;
use ethcore::miner::MinerService;
use ethcore::client::MiningBlockChainClient;
use ethcore::transaction::{Action, SignedTransaction, PendingTransaction, Transaction};
@ -55,7 +59,7 @@ pub trait Dispatcher: Send + Sync + Clone {
-> BoxFuture<FilledTransactionRequest, Error>;
/// Sign the given transaction request without dispatching, fetching appropriate nonce.
fn sign(&self, accounts: &AccountProvider, filled: FilledTransactionRequest, password: SignWith)
fn sign(&self, accounts: Arc<AccountProvider>, filled: FilledTransactionRequest, password: SignWith)
-> BoxFuture<WithToken<SignedTransaction>, Error>;
/// "Dispatch" a local transaction.
@ -108,13 +112,13 @@ impl<C: MiningBlockChainClient, M: MinerService> Dispatcher for FullDispatcher<C
}).boxed()
}
fn sign(&self, accounts: &AccountProvider, filled: FilledTransactionRequest, password: SignWith)
fn sign(&self, accounts: Arc<AccountProvider>, filled: FilledTransactionRequest, password: SignWith)
-> BoxFuture<WithToken<SignedTransaction>, Error>
{
let (client, miner) = (take_weakf!(self.client), take_weakf!(self.miner));
let network_id = client.signing_network_id();
let address = filled.from;
future::ok({
future::done({
let t = Transaction {
nonce: filled.nonce
.or_else(|| miner
@ -129,31 +133,15 @@ impl<C: MiningBlockChainClient, M: MinerService> Dispatcher for FullDispatcher<C
data: filled.data,
};
let hash = t.hash(network_id);
if accounts.is_hardware_address(address) {
let mut stream = rlp::RlpStream::new();
t.rlp_append_unsigned_transaction(&mut stream, network_id);
let signature = try_bf!(
accounts.sign_with_hardware(address, &stream.as_raw())
.map_err(|e| {
debug!(target: "miner", "Error signing transaction with hardware wallet: {}", e);
errors::account("Error signing transaction with hardware wallet", e)
})
);
let signed = try_bf!(
SignedTransaction::new(t.with_signature(signature, network_id))
.map_err(|e| {
debug!(target: "miner", "Hardware wallet has produced invalid signature: {}", e);
errors::account("Invalid signature generated", e)
})
);
WithToken::No(signed)
hardware_signature(&*accounts, address, t, network_id).map(WithToken::No)
} else {
let signature = try_bf!(signature(accounts, address, hash, password));
signature.map(|sig| {
let hash = t.hash(network_id);
let signature = try_bf!(signature(&*accounts, address, hash, password));
Ok(signature.map(|sig| {
SignedTransaction::new(t.with_signature(sig, network_id))
.expect("Transaction was signed by AccountsProvider; it never produces invalid signatures; qed")
})
}))
}
}).boxed()
}
@ -167,6 +155,117 @@ impl<C: MiningBlockChainClient, M: MinerService> Dispatcher for FullDispatcher<C
}
}
/// Dispatcher for light clients -- fetches default gas price, next nonce, etc. from network.
/// Light client `ETH` RPC.
#[derive(Clone)]
pub struct LightDispatcher {
sync: Arc<LightSync>,
client: Arc<LightChainClient>,
on_demand: Arc<OnDemand>,
transaction_queue: Arc<RwLock<LightTransactionQueue>>,
}
impl LightDispatcher {
/// Create a new `LightDispatcher` from its requisite parts.
///
/// For correct operation, the OnDemand service is assumed to be registered as a network handler,
pub fn new(
sync: Arc<LightSync>,
client: Arc<LightChainClient>,
on_demand: Arc<OnDemand>,
transaction_queue: Arc<RwLock<LightTransactionQueue>>,
) -> Self {
LightDispatcher {
sync: sync,
client: client,
on_demand: on_demand,
transaction_queue: transaction_queue,
}
}
}
impl Dispatcher for LightDispatcher {
fn fill_optional_fields(&self, request: TransactionRequest, default_sender: Address)
-> BoxFuture<FilledTransactionRequest, Error>
{
let request = request;
let gas_limit = self.client.best_block_header().gas_limit();
future::ok(FilledTransactionRequest {
from: request.from.unwrap_or(default_sender),
used_default_from: request.from.is_none(),
to: request.to,
nonce: request.nonce,
gas_price: request.gas_price.unwrap_or_else(|| 21_000_000.into()), // TODO: fetch corpus from network.
gas: request.gas.unwrap_or_else(|| gas_limit / 3.into()),
value: request.value.unwrap_or_else(|| 0.into()),
data: request.data.unwrap_or_else(Vec::new),
condition: request.condition,
}).boxed()
}
fn sign(&self, accounts: Arc<AccountProvider>, filled: FilledTransactionRequest, password: SignWith)
-> BoxFuture<WithToken<SignedTransaction>, Error>
{
let network_id = None; // TODO: fetch from client.
let address = filled.from;
let best_header = self.client.best_block_header();
let with_nonce = move |filled: FilledTransactionRequest, nonce| {
let t = Transaction {
nonce: nonce,
action: filled.to.map_or(Action::Create, Action::Call),
gas: filled.gas,
gas_price: filled.gas_price,
value: filled.value,
data: filled.data,
};
if accounts.is_hardware_address(address) {
return hardware_signature(&*accounts, address, t, network_id).map(WithToken::No)
}
let hash = t.hash(network_id);
let signature = signature(&*accounts, address, hash, password)?;
Ok(signature.map(|sig| {
SignedTransaction::new(t.with_signature(sig, network_id))
.expect("Transaction was signed by AccountsProvider; it never produces invalid signatures; qed")
}))
};
// fast path where we don't go to network; nonce provided or can be gotten from queue.
let maybe_nonce = filled.nonce.or_else(|| self.transaction_queue.read().next_nonce(&address));
if let Some(nonce) = maybe_nonce {
return future::done(with_nonce(filled, nonce)).boxed()
}
let nonce_future = self.sync.with_context(|ctx| self.on_demand.account(ctx, request::Account {
header: best_header,
address: address,
}));
let nonce_future = match nonce_future {
Some(x) => x,
None => return future::err(errors::no_light_peers()).boxed()
};
nonce_future
.map_err(|_| errors::no_light_peers())
.and_then(move |acc| with_nonce(filled, acc.nonce))
.boxed()
}
fn dispatch_transaction(&self, signed_transaction: PendingTransaction) -> Result<H256, Error> {
let hash = signed_transaction.transaction.hash();
self.transaction_queue.write().import(signed_transaction)
.map_err(Into::into)
.map_err(errors::from_transaction_error)
.map(|_| hash)
}
}
/// default MAC to use.
pub const DEFAULT_MAC: [u8; 2] = [0, 0];
@ -251,7 +350,7 @@ impl<T: Debug> From<(T, Option<AccountToken>)> for WithToken<T> {
/// Execute a confirmation payload.
pub fn execute<D: Dispatcher + 'static>(
dispatcher: D,
accounts: &AccountProvider,
accounts: Arc<AccountProvider>,
payload: ConfirmationPayload,
pass: SignWith
) -> BoxFuture<WithToken<ConfirmationResponse>, Error> {
@ -281,7 +380,7 @@ pub fn execute<D: Dispatcher + 'static>(
format!("\x19Ethereum Signed Message:\n{}", data.len())
.into_bytes();
message_data.append(&mut data);
let res = signature(accounts, address, message_data.sha3(), pass)
let res = signature(&accounts, address, message_data.sha3(), pass)
.map(|result| result
.map(|rsv| {
let mut vrs = [0u8; 65];
@ -297,7 +396,7 @@ pub fn execute<D: Dispatcher + 'static>(
future::done(res).boxed()
},
ConfirmationPayload::Decrypt(address, data) => {
let res = decrypt(accounts, address, data, pass)
let res = decrypt(&accounts, address, data, pass)
.map(|result| result
.map(RpcBytes)
.map(ConfirmationResponse::Decrypt)
@ -318,6 +417,27 @@ fn signature(accounts: &AccountProvider, address: Address, hash: H256, password:
})
}
// obtain a hardware signature from the given account.
fn hardware_signature(accounts: &AccountProvider, address: Address, t: Transaction, network_id: Option<u64>)
-> Result<SignedTransaction, Error>
{
debug_assert!(accounts.is_hardware_address(address));
let mut stream = rlp::RlpStream::new();
t.rlp_append_unsigned_transaction(&mut stream, network_id);
let signature = accounts.sign_with_hardware(address, &stream.as_raw())
.map_err(|e| {
debug!(target: "miner", "Error signing transaction with hardware wallet: {}", e);
errors::account("Error signing transaction with hardware wallet", e)
})?;
SignedTransaction::new(t.with_signature(signature, network_id))
.map_err(|e| {
debug!(target: "miner", "Hardware wallet has produced invalid signature: {}", e);
errors::account("Invalid signature generated", e)
})
}
fn decrypt(accounts: &AccountProvider, address: Address, msg: Bytes, password: SignWith) -> Result<WithToken<Bytes>, Error> {
match password.clone() {
SignWith::Nothing => accounts.decrypt(address, None, &DEFAULT_MAC, &msg).map(WithToken::No),

View File

@ -49,6 +49,7 @@ mod codes {
pub const COMPILATION_ERROR: i64 = -32050;
pub const ENCRYPTION_ERROR: i64 = -32055;
pub const FETCH_ERROR: i64 = -32060;
pub const NO_LIGHT_PEERS: i64 = -32065;
}
pub fn unimplemented(details: Option<String>) -> Error {
@ -308,3 +309,11 @@ pub fn unknown_block() -> Error {
data: None,
}
}
pub fn no_light_peers() -> Error {
Error {
code: ErrorCode::ServerError(codes::NO_LIGHT_PEERS),
message: "No light peers who can serve data".into(),
data: None,
}
}

View File

@ -25,16 +25,18 @@ use jsonrpc_core::Error;
use jsonrpc_macros::Trailing;
use light::client::Client as LightClient;
use light::cht;
use light::{cht, TransactionQueue};
use light::on_demand::{request, OnDemand};
use ethcore::account_provider::{AccountProvider, DappId};
use ethcore::basic_account::BasicAccount;
use ethcore::encoded;
use ethcore::ids::BlockId;
use ethcore::transaction::SignedTransaction;
use ethsync::LightSync;
use rlp::{UntrustedRlp, View};
use util::sha3::{SHA3_NULL_RLP, SHA3_EMPTY_LIST_RLP};
use util::U256;
use util::{RwLock, U256};
use futures::{future, Future, BoxFuture};
use futures::sync::oneshot;
@ -56,6 +58,7 @@ pub struct EthClient {
sync: Arc<LightSync>,
client: Arc<LightClient>,
on_demand: Arc<OnDemand>,
transaction_queue: Arc<RwLock<TransactionQueue>>,
accounts: Arc<AccountProvider>,
}
@ -76,12 +79,14 @@ impl EthClient {
sync: Arc<LightSync>,
client: Arc<LightClient>,
on_demand: Arc<OnDemand>,
transaction_queue: Arc<RwLock<TransactionQueue>>,
accounts: Arc<AccountProvider>,
) -> Self {
EthClient {
sync: sync,
client: client,
on_demand: on_demand,
transaction_queue: transaction_queue,
accounts: accounts,
}
}
@ -300,11 +305,27 @@ impl Eth for EthClient {
}
fn send_raw_transaction(&self, raw: Bytes) -> Result<RpcH256, Error> {
Err(errors::unimplemented(None))
let best_header = self.client.best_block_header().decode();
UntrustedRlp::new(&raw.into_vec()).as_val()
.map_err(errors::from_rlp_error)
.and_then(|tx| {
self.client.engine().verify_transaction_basic(&tx, &best_header)
.map_err(errors::from_transaction_error)?;
let signed = SignedTransaction::new(tx).map_err(errors::from_transaction_error)?;
let hash = signed.hash();
self.transaction_queue.write().import(signed.into())
.map(|_| hash)
.map_err(Into::into)
.map_err(errors::from_transaction_error)
})
.map(Into::into)
}
fn submit_transaction(&self, raw: Bytes) -> Result<RpcH256, Error> {
Err(errors::unimplemented(None))
self.send_raw_transaction(raw)
}
fn call(&self, req: CallRequest, num: Trailing<BlockNumber>) -> Result<Bytes, Error> {

View File

@ -113,7 +113,7 @@ impl<D: Dispatcher + 'static> Personal for PersonalClient<D> {
dispatcher.fill_optional_fields(request.into(), default)
.and_then(move |filled| {
let condition = filled.condition.clone().map(Into::into);
dispatcher.sign(&accounts, filled, SignWith::Password(password))
dispatcher.sign(accounts, filled, SignWith::Password(password))
.map(|tx| tx.into_value())
.map(move |tx| PendingTransaction::new(tx, condition))
.map(move |tx| (tx, dispatcher))

View File

@ -52,7 +52,7 @@ impl<D: Dispatcher + 'static> SignerClient<D> {
}
fn confirm_internal<F, T>(&self, id: U256, modification: TransactionModification, f: F) -> BoxFuture<WithToken<ConfirmationResponse>, Error> where
F: FnOnce(D, &AccountProvider, ConfirmationPayload) -> T,
F: FnOnce(D, Arc<AccountProvider>, ConfirmationPayload) -> T,
T: IntoFuture<Item=WithToken<ConfirmationResponse>, Error=Error>,
T::Future: Send + 'static
{
@ -87,7 +87,7 @@ impl<D: Dispatcher + 'static> SignerClient<D> {
request.condition = condition.clone().map(Into::into);
}
}
let fut = f(dispatcher, &*accounts, payload);
let fut = f(dispatcher, accounts, payload);
fut.into_future().then(move |result| {
// Execute
if let Ok(ref response) = result {

View File

@ -95,7 +95,7 @@ impl<D: Dispatcher + 'static> SigningQueueClient<D> {
.and_then(move |payload| {
let sender = payload.sender();
if accounts.is_unlocked(sender) {
dispatch::execute(dispatcher, &accounts, payload, dispatch::SignWith::Nothing)
dispatch::execute(dispatcher, accounts, payload, dispatch::SignWith::Nothing)
.map(|v| v.into_value())
.map(DispatchResult::Value)
.boxed()

View File

@ -61,7 +61,7 @@ impl<D: Dispatcher + 'static> SigningUnsafeClient<D> {
let dis = self.dispatcher.clone();
dispatch::from_rpc(payload, default, &dis)
.and_then(move |payload| {
dispatch::execute(dis, &accounts, payload, dispatch::SignWith::Nothing)
dispatch::execute(dis, accounts, payload, dispatch::SignWith::Nothing)
})
.map(|v| v.into_value())
.boxed()

View File

@ -34,7 +34,7 @@ use ipc::{BinaryConvertable, BinaryConvertError, IpcConfig};
use std::str::FromStr;
use parking_lot::RwLock;
use chain::{ETH_PACKET_COUNT, SNAPSHOT_SYNC_PACKET_COUNT};
use light::client::LightChainClient;
use light::client::AsLightClient;
use light::Provider;
use light::net::{self as light_net, LightProtocol, Params as LightParams, Capabilities, Handler as LightHandler, EventContext};
@ -642,7 +642,7 @@ pub struct LightSync {
impl LightSync {
/// Create a new light sync service.
pub fn new<L>(params: LightSyncParams<L>) -> Result<Self, NetworkError>
where L: LightChainClient + Provider + 'static
where L: AsLightClient + Provider + Sync + Send + 'static
{
use light_sync::LightSync as SyncHandler;

View File

@ -36,7 +36,7 @@ use std::collections::HashMap;
use std::mem;
use std::sync::Arc;
use light::client::LightChainClient;
use light::client::{AsLightClient, LightChainClient};
use light::net::{
Announcement, Handler, BasicContext, EventContext,
Capabilities, ReqId, Status,
@ -106,8 +106,9 @@ impl AncestorSearch {
}
fn process_response<L>(self, ctx: &ResponseContext, client: &L) -> AncestorSearch
where L: LightChainClient
where L: AsLightClient
{
let client = client.as_light_client();
let first_num = client.chain_info().first_block_number.unwrap_or(0);
match self {
AncestorSearch::Awaiting(id, start, req) => {
@ -203,7 +204,7 @@ impl<'a> ResponseContext for ResponseCtx<'a> {
}
/// Light client synchronization manager. See module docs for more details.
pub struct LightSync<L: LightChainClient> {
pub struct LightSync<L: AsLightClient> {
best_seen: Mutex<Option<ChainInfo>>, // best seen block on the network.
peers: RwLock<HashMap<PeerId, Mutex<Peer>>>, // peers which are relevant to synchronization.
client: Arc<L>,
@ -211,7 +212,7 @@ pub struct LightSync<L: LightChainClient> {
state: Mutex<SyncState>,
}
impl<L: LightChainClient> Handler for LightSync<L> {
impl<L: AsLightClient + Send + Sync> Handler for LightSync<L> {
fn on_connect(&self, ctx: &EventContext, status: &Status, capabilities: &Capabilities) {
if !capabilities.serve_headers {
trace!(target: "sync", "Disconnecting irrelevant peer: {}", ctx.peer());
@ -344,7 +345,7 @@ impl<L: LightChainClient> Handler for LightSync<L> {
}
// private helpers
impl<L: LightChainClient> LightSync<L> {
impl<L: AsLightClient> LightSync<L> {
// Begins a search for the common ancestor and our best block.
// does not lock state, instead has a mutable reference to it passed.
fn begin_search(&self, state: &mut SyncState) {
@ -354,8 +355,8 @@ impl<L: LightChainClient> LightSync<L> {
return;
}
self.client.flush_queue();
let chain_info = self.client.chain_info();
self.client.as_light_client().flush_queue();
let chain_info = self.client.as_light_client().chain_info();
trace!(target: "sync", "Beginning search for common ancestor from {:?}",
(chain_info.best_block_number, chain_info.best_block_hash));
@ -366,8 +367,10 @@ impl<L: LightChainClient> LightSync<L> {
fn maintain_sync(&self, ctx: &BasicContext) {
const DRAIN_AMOUNT: usize = 128;
let client = self.client.as_light_client();
let chain_info = client.chain_info();
let mut state = self.state.lock();
let chain_info = self.client.chain_info();
debug!(target: "sync", "Maintaining sync ({:?})", &*state);
// drain any pending blocks into the queue.
@ -376,7 +379,7 @@ impl<L: LightChainClient> LightSync<L> {
'a:
loop {
if self.client.queue_info().is_full() { break }
if client.queue_info().is_full() { break }
*state = match mem::replace(&mut *state, SyncState::Idle) {
SyncState::Rounds(round)
@ -388,7 +391,7 @@ impl<L: LightChainClient> LightSync<L> {
trace!(target: "sync", "Drained {} headers to import", sink.len());
for header in sink.drain(..) {
if let Err(e) = self.client.queue_header(header) {
if let Err(e) = client.queue_header(header) {
debug!(target: "sync", "Found bad header ({:?}). Reset to search state.", e);
self.begin_search(&mut state);
@ -492,7 +495,7 @@ impl<L: LightChainClient> LightSync<L> {
}
// public API
impl<L: LightChainClient> LightSync<L> {
impl<L: AsLightClient> LightSync<L> {
/// Create a new instance of `LightSync`.
///
/// This won't do anything until registered as a handler