diff --git a/ethcore/light/src/client/mod.rs b/ethcore/light/src/client/mod.rs index ea4660abc..a113b4367 100644 --- a/ethcore/light/src/client/mod.rs +++ b/ethcore/light/src/client/mod.rs @@ -16,24 +16,22 @@ //! Light client implementation. Stores data from light sync +use std::sync::Arc; + use ethcore::block_import_error::BlockImportError; use ethcore::block_status::BlockStatus; use ethcore::client::ClientReport; +use ethcore::engines::Engine; use ethcore::ids::BlockId; use ethcore::header::Header; use ethcore::verification::queue::{self, HeaderQueue}; -use ethcore::transaction::{PendingTransaction, Condition as TransactionCondition}; use ethcore::blockchain_info::BlockChainInfo; use ethcore::spec::Spec; use ethcore::service::ClientIoMessage; use ethcore::encoded; use io::IoChannel; -use util::hash::{H256, H256FastMap}; -use util::{Bytes, Mutex, RwLock}; - -use provider::Provider; -use request; +use util::{Bytes, H256, Mutex, RwLock}; use self::header_chain::HeaderChain; @@ -58,6 +56,12 @@ pub trait LightChainClient: Send + Sync { /// parent queued prior. fn queue_header(&self, header: Header) -> Result; + /// Attempt to get block header by block id. + fn block_header(&self, id: BlockId) -> Option; + + /// Get the best block header. + fn best_block_header(&self) -> encoded::Header; + /// Query whether a block is known. fn is_known(&self, hash: &H256) -> bool; @@ -74,11 +78,26 @@ pub trait LightChainClient: Send + Sync { fn cht_root(&self, i: usize) -> Option; } +/// Something which can be treated as a `LightChainClient`. +pub trait AsLightClient { + /// The kind of light client this can be treated as. + type Client: LightChainClient; + + /// Access the underlying light client. + fn as_light_client(&self) -> &Self::Client; +} + +impl AsLightClient for T { + type Client = Self; + + fn as_light_client(&self) -> &Self { self } +} + /// Light client implementation. pub struct Client { queue: HeaderQueue, + engine: Arc, chain: HeaderChain, - tx_pool: Mutex>, report: RwLock, import_lock: Mutex<()>, } @@ -88,8 +107,8 @@ impl Client { pub fn new(config: Config, spec: &Spec, io_channel: IoChannel) -> Self { Client { queue: HeaderQueue::new(config.queue, spec.engine.clone(), io_channel, true), + engine: spec.engine.clone(), chain: HeaderChain::new(&::rlp::encode(&spec.genesis_header())), - tx_pool: Mutex::new(Default::default()), report: RwLock::new(ClientReport::default()), import_lock: Mutex::new(()), } @@ -100,25 +119,6 @@ impl Client { self.queue.import(header).map_err(Into::into) } - /// Import a local transaction. - pub fn import_own_transaction(&self, tx: PendingTransaction) { - self.tx_pool.lock().insert(tx.transaction.hash(), tx); - } - - /// Fetch a vector of all pending transactions. - pub fn ready_transactions(&self) -> Vec { - let best = self.chain.best_header(); - self.tx_pool.lock() - .values() - .filter(|t| match t.condition { - Some(TransactionCondition::Number(x)) => x <= best.number(), - Some(TransactionCondition::Timestamp(x)) => x <= best.timestamp(), - None => true, - }) - .cloned() - .collect() - } - /// Inquire about the status of a given header. pub fn status(&self, hash: &H256) -> BlockStatus { match self.queue.status(hash) { @@ -159,6 +159,11 @@ impl Client { self.chain.block_header(id) } + /// Get the best block header. + pub fn best_block_header(&self) -> encoded::Header { + self.chain.best_header() + } + /// Flush the header queue. pub fn flush_queue(&self) { self.queue.flush() @@ -207,6 +212,11 @@ impl Client { self.chain.heap_size_of_children() } + + /// Get a handle to the verification engine. + pub fn engine(&self) -> &Engine { + &*self.engine + } } impl LightChainClient for Client { @@ -216,6 +226,14 @@ impl LightChainClient for Client { self.import_header(header) } + fn block_header(&self, id: BlockId) -> Option { + Client::block_header(self, id) + } + + fn best_block_header(&self) -> encoded::Header { + Client::best_block_header(self) + } + fn is_known(&self, hash: &H256) -> bool { self.status(hash) == BlockStatus::InChain } @@ -237,8 +255,8 @@ impl LightChainClient for Client { } } -// dummy implementation -- may draw from canonical cache further on. -impl Provider for Client { +// dummy implementation, should be removed when a `TestClient` is added. +impl ::provider::Provider for Client { fn chain_info(&self) -> BlockChainInfo { Client::chain_info(self) } @@ -263,19 +281,19 @@ impl Provider for Client { None } - fn state_proof(&self, _req: request::StateProof) -> Vec { + fn state_proof(&self, _req: ::request::StateProof) -> Vec { Vec::new() } - fn contract_code(&self, _req: request::ContractCode) -> Bytes { + fn contract_code(&self, _req: ::request::ContractCode) -> Bytes { Vec::new() } - fn header_proof(&self, _req: request::HeaderProof) -> Option<(encoded::Header, Vec)> { + fn header_proof(&self, _req: ::request::HeaderProof) -> Option<(encoded::Header, Vec)> { None } - fn ready_transactions(&self) -> Vec { + fn ready_transactions(&self) -> Vec<::ethcore::transaction::PendingTransaction> { Vec::new() } } diff --git a/ethcore/light/src/lib.rs b/ethcore/light/src/lib.rs index 6236ba118..94d267c7a 100644 --- a/ethcore/light/src/lib.rs +++ b/ethcore/light/src/lib.rs @@ -36,6 +36,7 @@ pub mod client; pub mod cht; pub mod net; pub mod on_demand; +pub mod transaction_queue; #[cfg(not(feature = "ipc"))] pub mod provider; @@ -54,6 +55,7 @@ pub mod remote { mod types; pub use self::provider::Provider; +pub use self::transaction_queue::TransactionQueue; pub use types::les_request as request; #[macro_use] diff --git a/ethcore/light/src/provider.rs b/ethcore/light/src/provider.rs index 4721caa73..caade3857 100644 --- a/ethcore/light/src/provider.rs +++ b/ethcore/light/src/provider.rs @@ -17,15 +17,19 @@ //! A provider for the LES protocol. This is typically a full node, who can //! give as much data as necessary to its peers. +use std::sync::Arc; + use ethcore::blockchain_info::BlockChainInfo; use ethcore::client::{BlockChainClient, ProvingBlockChainClient}; use ethcore::transaction::PendingTransaction; use ethcore::ids::BlockId; use ethcore::encoded; +use util::{Bytes, RwLock, H256}; use cht::{self, BlockInfo}; +use client::{LightChainClient, AsLightClient}; +use transaction_queue::TransactionQueue; -use util::{Bytes, H256}; use request; @@ -284,6 +288,75 @@ impl Provider for T { } } +/// The light client "provider" implementation. This wraps a `LightClient` and +/// a light transaction queue. +pub struct LightProvider { + client: Arc, + txqueue: Arc>, +} + +impl LightProvider { + /// Create a new `LightProvider` from the given client and transaction queue. + pub fn new(client: Arc, txqueue: Arc>) -> Self { + LightProvider { + client: client, + txqueue: txqueue, + } + } +} + +// TODO: draw from cache (shared between this and the RPC layer) +impl Provider for LightProvider { + fn chain_info(&self) -> BlockChainInfo { + self.client.as_light_client().chain_info() + } + + fn reorg_depth(&self, _a: &H256, _b: &H256) -> Option { + None + } + + fn earliest_state(&self) -> Option { + None + } + + fn block_header(&self, id: BlockId) -> Option { + self.client.as_light_client().block_header(id) + } + + fn block_body(&self, _id: BlockId) -> Option { + None + } + + fn block_receipts(&self, _hash: &H256) -> Option { + None + } + + fn state_proof(&self, _req: request::StateProof) -> Vec { + Vec::new() + } + + fn contract_code(&self, _req: request::ContractCode) -> Bytes { + Vec::new() + } + + fn header_proof(&self, _req: request::HeaderProof) -> Option<(encoded::Header, Vec)> { + None + } + + fn ready_transactions(&self) -> Vec { + let chain_info = self.chain_info(); + self.txqueue.read().ready_transactions(chain_info.best_block_number, chain_info.best_block_timestamp) + } +} + +impl AsLightClient for LightProvider { + type Client = L::Client; + + fn as_light_client(&self) -> &L::Client { + self.client.as_light_client() + } +} + #[cfg(test)] mod tests { use ethcore::client::{EachBlockWith, TestBlockChainClient}; diff --git a/ethcore/light/src/transaction_queue.rs b/ethcore/light/src/transaction_queue.rs new file mode 100644 index 000000000..8ca6a64f6 --- /dev/null +++ b/ethcore/light/src/transaction_queue.rs @@ -0,0 +1,474 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Light Transaction Queue. +//! +//! Manages local transactions, +//! but stores all local transactions, removing only on invalidated nonce. +//! +//! Under the assumption that light nodes will have a relatively limited set of +//! accounts for which they create transactions, this queue is structured in an +//! address-wise manner. + +use std::collections::{BTreeMap, HashMap}; +use std::collections::hash_map::Entry; + +use ethcore::error::TransactionError; +use ethcore::transaction::{Condition, PendingTransaction, SignedTransaction}; +use ethcore::transaction_import::TransactionImportResult; +use util::{Address, U256, H256, H256FastMap}; + +// Knowledge of an account's current nonce. +#[derive(Debug, Clone, PartialEq, Eq)] +enum CurrentNonce { + // Assumed current nonce. + Assumed(U256), + // Known current nonce. + Known(U256), +} + +impl CurrentNonce { + // whether this nonce is assumed + fn is_assumed(&self) -> bool { + match *self { + CurrentNonce::Assumed(_) => true, + CurrentNonce::Known(_) => false, + } + } + + // whether this nonce is known for certain from an external source. + fn is_known(&self) -> bool { + !self.is_assumed() + } + + // the current nonce's value. + fn value(&self) -> &U256 { + match *self { + CurrentNonce::Assumed(ref val) => val, + CurrentNonce::Known(ref val) => val, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +struct TransactionInfo { + hash: H256, + nonce: U256, + condition: Option, +} + +impl<'a> From<&'a PendingTransaction> for TransactionInfo { + fn from(tx: &'a PendingTransaction) -> Self { + TransactionInfo { + hash: tx.hash(), + nonce: tx.nonce.clone(), + condition: tx.condition.clone(), + } + } +} + +// transactions associated with a specific account. +#[derive(Debug, Clone, PartialEq, Eq)] +struct AccountTransactions { + // believed current nonce (gotten from initial given TX or `cull` calls). + cur_nonce: CurrentNonce, + current: Vec, // ordered "current" transactions (cur_nonce onwards) + future: BTreeMap, // "future" transactions. +} + +impl AccountTransactions { + fn is_empty(&self) -> bool { + self.current.is_empty() && self.future.is_empty() + } + + fn next_nonce(&self) -> U256 { + self.current.last().map(|last| last.nonce + 1.into()) + .unwrap_or_else(|| *self.cur_nonce.value()) + } + + // attempt to move transactions from the future queue into the current queue. + fn adjust_future(&mut self) { + let mut next_nonce = self.next_nonce(); + + loop { + match self.future.remove(&next_nonce) { + Some(tx) => self.current.push(tx), + None => break, + } + + next_nonce = next_nonce + 1.into(); + } + } +} + +/// Light transaction queue. See module docs for more details. +#[derive(Debug, Default, Clone, PartialEq, Eq)] +pub struct TransactionQueue { + by_account: HashMap, + by_hash: H256FastMap, +} + +impl TransactionQueue { + /// Import a pending transaction to be queued. + pub fn import(&mut self, tx: PendingTransaction) -> Result { + let sender = tx.sender(); + let hash = tx.hash(); + let nonce = tx.nonce; + let tx_info = TransactionInfo::from(&tx); + + if self.by_hash.contains_key(&hash) { return Err(TransactionError::AlreadyImported) } + + let res = match self.by_account.entry(sender) { + Entry::Vacant(entry) => { + entry.insert(AccountTransactions { + cur_nonce: CurrentNonce::Assumed(nonce), + current: vec![tx_info], + future: BTreeMap::new(), + }); + + TransactionImportResult::Current + } + Entry::Occupied(mut entry) => { + let acct_txs = entry.get_mut(); + if &nonce < acct_txs.cur_nonce.value() { + // don't accept txs from before known current nonce. + if acct_txs.cur_nonce.is_known() { + return Err(TransactionError::Old) + } + + // lower our assumption until corrected later. + acct_txs.cur_nonce = CurrentNonce::Assumed(nonce); + } + + match acct_txs.current.binary_search_by(|x| x.nonce.cmp(&nonce)) { + Ok(idx) => { + trace!(target: "txqueue", "Replacing existing transaction from {} with nonce {}", + sender, nonce); + + let old = ::std::mem::replace(&mut acct_txs.current[idx], tx_info); + self.by_hash.remove(&old.hash); + + TransactionImportResult::Current + } + Err(idx) => { + let cur_len = acct_txs.current.len(); + let incr_nonce = nonce + 1.into(); + + // current is sorted with one tx per nonce, + // so if a tx with given nonce wasn't found that means it is either + // earlier in nonce than all other "current" transactions or later. + assert!(idx == 0 || idx == cur_len); + + if idx == 0 && acct_txs.current.first().map_or(false, |f| f.nonce != incr_nonce) { + let old_cur = ::std::mem::replace(&mut acct_txs.current, vec![tx_info]); + + trace!(target: "txqueue", "Moving {} transactions with nonce > {} to future", + old_cur.len(), incr_nonce); + + for future in old_cur { + let future_nonce = future.nonce; + acct_txs.future.insert(future_nonce, future); + } + + TransactionImportResult::Current + } else if idx == cur_len && acct_txs.current.last().map_or(false, |f| f.nonce + 1.into() != nonce) { + trace!(target: "txqueue", "Queued future transaction for {}, nonce={}", sender, nonce); + let future_nonce = nonce; + acct_txs.future.insert(future_nonce, tx_info); + + TransactionImportResult::Future + } else { + trace!(target: "txqueue", "Queued current transaction for {}, nonce={}", sender, nonce); + + // insert, then check if we've filled any gaps. + acct_txs.current.insert(idx, tx_info); + acct_txs.adjust_future(); + + TransactionImportResult::Current + } + } + } + } + }; + + self.by_hash.insert(hash, tx); + Ok(res) + } + + /// Get pending transaction by hash. + pub fn transaction(&self, hash: &H256) -> Option { + self.by_hash.get(hash).map(|tx| (&**tx).clone()) + } + + /// Get the next nonce for a given address based on what's within the queue. + /// If the address has no queued transactions, then `None` will be returned + /// and the next nonce will have to be deduced via other means. + pub fn next_nonce(&self, address: &Address) -> Option { + self.by_account.get(address).map(AccountTransactions::next_nonce) + } + + /// Get all transactions ready to be propagated. + /// `best_block_number` and `best_block_timestamp` are used to filter out conditionally + /// propagated transactions. + /// + /// Returned transactions are batched by sender, in order of ascending nonce. + pub fn ready_transactions(&self, best_block_number: u64, best_block_timestamp: u64) -> Vec { + self.by_account.values() + .flat_map(|acct_txs| { + acct_txs.current.iter().take_while(|tx| match tx.condition { + None => true, + Some(Condition::Number(blk_num)) => blk_num <= best_block_number, + Some(Condition::Timestamp(time)) => time <= best_block_timestamp, + }).map(|info| info.hash) + }) + .filter_map(|hash| match self.by_hash.get(&hash) { + Some(tx) => Some(tx.clone()), + None => { + warn!(target: "txqueue", "Inconsistency detected between `by_hash` and `by_account`: {} not stored.", + hash); + None + } + }) + .collect() + } + + /// Addresses for which we store transactions. + pub fn queued_senders(&self) -> Vec
{ + self.by_account.keys().cloned().collect() + } + + /// Cull out all transactions by the given address which are invalidated by the given nonce. + pub fn cull(&mut self, address: Address, cur_nonce: U256) { + let mut removed_hashes = vec![]; + if let Entry::Occupied(mut entry) = self.by_account.entry(address) { + { + let acct_txs = entry.get_mut(); + acct_txs.cur_nonce = CurrentNonce::Known(cur_nonce); + + // cull old "future" keys. + let old_future: Vec<_> = acct_txs.future.keys().take_while(|&&k| k < cur_nonce).cloned().collect(); + + for old in old_future { + let hash = acct_txs.future.remove(&old) + .expect("key extracted from keys iterator; known to exist; qed") + .hash; + removed_hashes.push(hash); + } + + // then cull from "current". + let valid_pos = acct_txs.current.iter().position(|tx| tx.nonce >= cur_nonce); + match valid_pos { + None => + removed_hashes.extend(acct_txs.current.drain(..).map(|tx| tx.hash)), + Some(valid) => + removed_hashes.extend(acct_txs.current.drain(..valid).map(|tx| tx.hash)), + } + + // now try and move stuff out of future into current. + acct_txs.adjust_future(); + } + + if entry.get_mut().is_empty() { + trace!(target: "txqueue", "No more queued transactions for {} after nonce {}", + address, cur_nonce); + entry.remove(); + } + } + + trace!(target: "txqueue", "Culled {} old transactions from sender {} (nonce={})", + removed_hashes.len(), address, cur_nonce); + + for hash in removed_hashes { + self.by_hash.remove(&hash); + } + } +} + +#[cfg(test)] +mod tests { + use super::TransactionQueue; + use util::Address; + use ethcore::transaction::{Transaction, PendingTransaction, Condition}; + + #[test] + fn queued_senders() { + let sender = Address::default(); + let mut txq = TransactionQueue::default(); + let tx = Transaction::default().fake_sign(sender); + + txq.import(tx.into()).unwrap(); + + assert_eq!(txq.queued_senders(), vec![sender]); + + txq.cull(sender, 1.into()); + + assert_eq!(txq.queued_senders(), vec![]); + assert!(txq.by_hash.is_empty()); + } + + #[test] + fn next_nonce() { + let sender = Address::default(); + let mut txq = TransactionQueue::default(); + + for i in (0..5).chain(10..15) { + let mut tx = Transaction::default(); + tx.nonce = i.into(); + + let tx = tx.fake_sign(sender); + + txq.import(tx.into()).unwrap(); + } + + // current: 0..5, future: 10..15 + assert_eq!(txq.ready_transactions(0, 0).len(), 5); + assert_eq!(txq.next_nonce(&sender).unwrap(), 5.into()); + + txq.cull(sender, 8.into()); + + // current: empty, future: 10..15 + assert_eq!(txq.ready_transactions(0, 0).len(), 0); + assert_eq!(txq.next_nonce(&sender).unwrap(), 8.into()); + + txq.cull(sender, 10.into()); + + // current: 10..15, future: empty + assert_eq!(txq.ready_transactions(0, 0).len(), 5); + assert_eq!(txq.next_nonce(&sender).unwrap(), 15.into()); + } + + #[test] + fn current_to_future() { + let sender = Address::default(); + let mut txq = TransactionQueue::default(); + + for i in 5..10 { + let mut tx = Transaction::default(); + tx.nonce = i.into(); + + let tx = tx.fake_sign(sender); + + txq.import(tx.into()).unwrap(); + } + + assert_eq!(txq.ready_transactions(0, 0).len(), 5); + assert_eq!(txq.next_nonce(&sender).unwrap(), 10.into()); + + for i in 0..3 { + let mut tx = Transaction::default(); + tx.nonce = i.into(); + + let tx = tx.fake_sign(sender); + + txq.import(tx.into()).unwrap(); + } + + assert_eq!(txq.ready_transactions(0, 0).len(), 3); + assert_eq!(txq.next_nonce(&sender).unwrap(), 3.into()); + + for i in 3..5 { + let mut tx = Transaction::default(); + tx.nonce = i.into(); + + let tx = tx.fake_sign(sender); + + txq.import(tx.into()).unwrap(); + } + + assert_eq!(txq.ready_transactions(0, 0).len(), 10); + assert_eq!(txq.next_nonce(&sender).unwrap(), 10.into()); + } + + #[test] + fn conditional() { + let mut txq = TransactionQueue::default(); + let sender = Address::default(); + + for i in 0..5 { + let mut tx = Transaction::default(); + tx.nonce = i.into(); + let tx = tx.fake_sign(sender); + + txq.import(match i { + 3 => PendingTransaction::new(tx, Some(Condition::Number(100))), + 4 => PendingTransaction::new(tx, Some(Condition::Timestamp(1234))), + _ => tx.into(), + }).unwrap(); + } + + assert_eq!(txq.ready_transactions(0, 0).len(), 3); + assert_eq!(txq.ready_transactions(0, 1234).len(), 3); + assert_eq!(txq.ready_transactions(100, 0).len(), 4); + assert_eq!(txq.ready_transactions(100, 1234).len(), 5); + } + + #[test] + fn cull_from_future() { + let sender = Address::default(); + let mut txq = TransactionQueue::default(); + + for i in (0..1).chain(3..10) { + let mut tx = Transaction::default(); + tx.nonce = i.into(); + + let tx = tx.fake_sign(sender); + + txq.import(tx.into()).unwrap(); + } + + txq.cull(sender, 6.into()); + + assert_eq!(txq.ready_transactions(0, 0).len(), 4); + assert_eq!(txq.next_nonce(&sender).unwrap(), 10.into()); + } + + #[test] + fn import_old() { + let sender = Address::default(); + let mut txq = TransactionQueue::default(); + + let mut tx_a = Transaction::default(); + tx_a.nonce = 3.into(); + + let mut tx_b = Transaction::default(); + tx_b.nonce = 2.into(); + + txq.import(tx_a.fake_sign(sender).into()).unwrap(); + txq.cull(sender, 3.into()); + + assert!(txq.import(tx_b.fake_sign(sender).into()).is_err()) + } + + #[test] + fn replace_is_removed() { + let sender = Address::default(); + let mut txq = TransactionQueue::default(); + + let tx_b: PendingTransaction = Transaction::default().fake_sign(sender).into(); + let tx_a: PendingTransaction = { + let mut tx_a = Transaction::default(); + tx_a.gas_price = tx_b.gas_price + 1.into(); + tx_a.fake_sign(sender).into() + }; + + let hash = tx_a.hash(); + + txq.import(tx_a).unwrap(); + txq.import(tx_b).unwrap(); + + assert!(txq.transaction(&hash).is_none()); + } +} diff --git a/rpc/src/v1/helpers/dispatch.rs b/rpc/src/v1/helpers/dispatch.rs index 8900593ac..de0207d79 100644 --- a/rpc/src/v1/helpers/dispatch.rs +++ b/rpc/src/v1/helpers/dispatch.rs @@ -18,14 +18,18 @@ use std::fmt::Debug; use std::ops::Deref; -use std::sync::Weak; +use std::sync::{Arc, Weak}; use futures::{future, Future, BoxFuture}; +use light::client::LightChainClient; +use light::on_demand::{request, OnDemand}; +use light::TransactionQueue as LightTransactionQueue; use rlp::{self, Stream}; -use util::{Address, H520, H256, U256, Uint, Bytes}; +use util::{Address, H520, H256, U256, Uint, Bytes, RwLock}; use util::sha3::Hashable; use ethkey::Signature; +use ethsync::LightSync; use ethcore::miner::MinerService; use ethcore::client::MiningBlockChainClient; use ethcore::transaction::{Action, SignedTransaction, PendingTransaction, Transaction}; @@ -55,7 +59,7 @@ pub trait Dispatcher: Send + Sync + Clone { -> BoxFuture; /// Sign the given transaction request without dispatching, fetching appropriate nonce. - fn sign(&self, accounts: &AccountProvider, filled: FilledTransactionRequest, password: SignWith) + fn sign(&self, accounts: Arc, filled: FilledTransactionRequest, password: SignWith) -> BoxFuture, Error>; /// "Dispatch" a local transaction. @@ -108,13 +112,13 @@ impl Dispatcher for FullDispatcher, filled: FilledTransactionRequest, password: SignWith) -> BoxFuture, Error> { let (client, miner) = (take_weakf!(self.client), take_weakf!(self.miner)); let network_id = client.signing_network_id(); let address = filled.from; - future::ok({ + future::done({ let t = Transaction { nonce: filled.nonce .or_else(|| miner @@ -129,31 +133,15 @@ impl Dispatcher for FullDispatcher Dispatcher for FullDispatcher, + client: Arc, + on_demand: Arc, + transaction_queue: Arc>, +} + +impl LightDispatcher { + /// Create a new `LightDispatcher` from its requisite parts. + /// + /// For correct operation, the OnDemand service is assumed to be registered as a network handler, + pub fn new( + sync: Arc, + client: Arc, + on_demand: Arc, + transaction_queue: Arc>, + ) -> Self { + LightDispatcher { + sync: sync, + client: client, + on_demand: on_demand, + transaction_queue: transaction_queue, + } + } +} + +impl Dispatcher for LightDispatcher { + fn fill_optional_fields(&self, request: TransactionRequest, default_sender: Address) + -> BoxFuture + { + let request = request; + let gas_limit = self.client.best_block_header().gas_limit(); + + future::ok(FilledTransactionRequest { + from: request.from.unwrap_or(default_sender), + used_default_from: request.from.is_none(), + to: request.to, + nonce: request.nonce, + gas_price: request.gas_price.unwrap_or_else(|| 21_000_000.into()), // TODO: fetch corpus from network. + gas: request.gas.unwrap_or_else(|| gas_limit / 3.into()), + value: request.value.unwrap_or_else(|| 0.into()), + data: request.data.unwrap_or_else(Vec::new), + condition: request.condition, + }).boxed() + } + + fn sign(&self, accounts: Arc, filled: FilledTransactionRequest, password: SignWith) + -> BoxFuture, Error> + { + let network_id = None; // TODO: fetch from client. + let address = filled.from; + let best_header = self.client.best_block_header(); + + let with_nonce = move |filled: FilledTransactionRequest, nonce| { + let t = Transaction { + nonce: nonce, + action: filled.to.map_or(Action::Create, Action::Call), + gas: filled.gas, + gas_price: filled.gas_price, + value: filled.value, + data: filled.data, + }; + + if accounts.is_hardware_address(address) { + return hardware_signature(&*accounts, address, t, network_id).map(WithToken::No) + } + + let hash = t.hash(network_id); + let signature = signature(&*accounts, address, hash, password)?; + + Ok(signature.map(|sig| { + SignedTransaction::new(t.with_signature(sig, network_id)) + .expect("Transaction was signed by AccountsProvider; it never produces invalid signatures; qed") + })) + }; + + // fast path where we don't go to network; nonce provided or can be gotten from queue. + let maybe_nonce = filled.nonce.or_else(|| self.transaction_queue.read().next_nonce(&address)); + if let Some(nonce) = maybe_nonce { + return future::done(with_nonce(filled, nonce)).boxed() + } + + let nonce_future = self.sync.with_context(|ctx| self.on_demand.account(ctx, request::Account { + header: best_header, + address: address, + })); + + let nonce_future = match nonce_future { + Some(x) => x, + None => return future::err(errors::no_light_peers()).boxed() + }; + + nonce_future + .map_err(|_| errors::no_light_peers()) + .and_then(move |acc| with_nonce(filled, acc.nonce)) + .boxed() + } + + fn dispatch_transaction(&self, signed_transaction: PendingTransaction) -> Result { + let hash = signed_transaction.transaction.hash(); + + self.transaction_queue.write().import(signed_transaction) + .map_err(Into::into) + .map_err(errors::from_transaction_error) + .map(|_| hash) + } +} + /// default MAC to use. pub const DEFAULT_MAC: [u8; 2] = [0, 0]; @@ -251,7 +350,7 @@ impl From<(T, Option)> for WithToken { /// Execute a confirmation payload. pub fn execute( dispatcher: D, - accounts: &AccountProvider, + accounts: Arc, payload: ConfirmationPayload, pass: SignWith ) -> BoxFuture, Error> { @@ -281,7 +380,7 @@ pub fn execute( format!("\x19Ethereum Signed Message:\n{}", data.len()) .into_bytes(); message_data.append(&mut data); - let res = signature(accounts, address, message_data.sha3(), pass) + let res = signature(&accounts, address, message_data.sha3(), pass) .map(|result| result .map(|rsv| { let mut vrs = [0u8; 65]; @@ -297,7 +396,7 @@ pub fn execute( future::done(res).boxed() }, ConfirmationPayload::Decrypt(address, data) => { - let res = decrypt(accounts, address, data, pass) + let res = decrypt(&accounts, address, data, pass) .map(|result| result .map(RpcBytes) .map(ConfirmationResponse::Decrypt) @@ -318,6 +417,27 @@ fn signature(accounts: &AccountProvider, address: Address, hash: H256, password: }) } +// obtain a hardware signature from the given account. +fn hardware_signature(accounts: &AccountProvider, address: Address, t: Transaction, network_id: Option) + -> Result +{ + debug_assert!(accounts.is_hardware_address(address)); + + let mut stream = rlp::RlpStream::new(); + t.rlp_append_unsigned_transaction(&mut stream, network_id); + let signature = accounts.sign_with_hardware(address, &stream.as_raw()) + .map_err(|e| { + debug!(target: "miner", "Error signing transaction with hardware wallet: {}", e); + errors::account("Error signing transaction with hardware wallet", e) + })?; + + SignedTransaction::new(t.with_signature(signature, network_id)) + .map_err(|e| { + debug!(target: "miner", "Hardware wallet has produced invalid signature: {}", e); + errors::account("Invalid signature generated", e) + }) +} + fn decrypt(accounts: &AccountProvider, address: Address, msg: Bytes, password: SignWith) -> Result, Error> { match password.clone() { SignWith::Nothing => accounts.decrypt(address, None, &DEFAULT_MAC, &msg).map(WithToken::No), diff --git a/rpc/src/v1/helpers/errors.rs b/rpc/src/v1/helpers/errors.rs index e1074c598..b58999f84 100644 --- a/rpc/src/v1/helpers/errors.rs +++ b/rpc/src/v1/helpers/errors.rs @@ -49,6 +49,7 @@ mod codes { pub const COMPILATION_ERROR: i64 = -32050; pub const ENCRYPTION_ERROR: i64 = -32055; pub const FETCH_ERROR: i64 = -32060; + pub const NO_LIGHT_PEERS: i64 = -32065; } pub fn unimplemented(details: Option) -> Error { @@ -308,3 +309,11 @@ pub fn unknown_block() -> Error { data: None, } } + +pub fn no_light_peers() -> Error { + Error { + code: ErrorCode::ServerError(codes::NO_LIGHT_PEERS), + message: "No light peers who can serve data".into(), + data: None, + } +} diff --git a/rpc/src/v1/impls/light/eth.rs b/rpc/src/v1/impls/light/eth.rs index c321657d2..944b419f7 100644 --- a/rpc/src/v1/impls/light/eth.rs +++ b/rpc/src/v1/impls/light/eth.rs @@ -25,16 +25,18 @@ use jsonrpc_core::Error; use jsonrpc_macros::Trailing; use light::client::Client as LightClient; -use light::cht; +use light::{cht, TransactionQueue}; use light::on_demand::{request, OnDemand}; use ethcore::account_provider::{AccountProvider, DappId}; use ethcore::basic_account::BasicAccount; use ethcore::encoded; use ethcore::ids::BlockId; +use ethcore::transaction::SignedTransaction; use ethsync::LightSync; +use rlp::{UntrustedRlp, View}; use util::sha3::{SHA3_NULL_RLP, SHA3_EMPTY_LIST_RLP}; -use util::U256; +use util::{RwLock, U256}; use futures::{future, Future, BoxFuture}; use futures::sync::oneshot; @@ -56,6 +58,7 @@ pub struct EthClient { sync: Arc, client: Arc, on_demand: Arc, + transaction_queue: Arc>, accounts: Arc, } @@ -76,12 +79,14 @@ impl EthClient { sync: Arc, client: Arc, on_demand: Arc, + transaction_queue: Arc>, accounts: Arc, ) -> Self { EthClient { sync: sync, client: client, on_demand: on_demand, + transaction_queue: transaction_queue, accounts: accounts, } } @@ -300,11 +305,27 @@ impl Eth for EthClient { } fn send_raw_transaction(&self, raw: Bytes) -> Result { - Err(errors::unimplemented(None)) + let best_header = self.client.best_block_header().decode(); + + UntrustedRlp::new(&raw.into_vec()).as_val() + .map_err(errors::from_rlp_error) + .and_then(|tx| { + self.client.engine().verify_transaction_basic(&tx, &best_header) + .map_err(errors::from_transaction_error)?; + + let signed = SignedTransaction::new(tx).map_err(errors::from_transaction_error)?; + let hash = signed.hash(); + + self.transaction_queue.write().import(signed.into()) + .map(|_| hash) + .map_err(Into::into) + .map_err(errors::from_transaction_error) + }) + .map(Into::into) } fn submit_transaction(&self, raw: Bytes) -> Result { - Err(errors::unimplemented(None)) + self.send_raw_transaction(raw) } fn call(&self, req: CallRequest, num: Trailing) -> Result { diff --git a/rpc/src/v1/impls/personal.rs b/rpc/src/v1/impls/personal.rs index fba058aee..2501e3f3a 100644 --- a/rpc/src/v1/impls/personal.rs +++ b/rpc/src/v1/impls/personal.rs @@ -113,7 +113,7 @@ impl Personal for PersonalClient { dispatcher.fill_optional_fields(request.into(), default) .and_then(move |filled| { let condition = filled.condition.clone().map(Into::into); - dispatcher.sign(&accounts, filled, SignWith::Password(password)) + dispatcher.sign(accounts, filled, SignWith::Password(password)) .map(|tx| tx.into_value()) .map(move |tx| PendingTransaction::new(tx, condition)) .map(move |tx| (tx, dispatcher)) diff --git a/rpc/src/v1/impls/signer.rs b/rpc/src/v1/impls/signer.rs index a94db94a0..ffd9f4108 100644 --- a/rpc/src/v1/impls/signer.rs +++ b/rpc/src/v1/impls/signer.rs @@ -52,7 +52,7 @@ impl SignerClient { } fn confirm_internal(&self, id: U256, modification: TransactionModification, f: F) -> BoxFuture, Error> where - F: FnOnce(D, &AccountProvider, ConfirmationPayload) -> T, + F: FnOnce(D, Arc, ConfirmationPayload) -> T, T: IntoFuture, Error=Error>, T::Future: Send + 'static { @@ -87,7 +87,7 @@ impl SignerClient { request.condition = condition.clone().map(Into::into); } } - let fut = f(dispatcher, &*accounts, payload); + let fut = f(dispatcher, accounts, payload); fut.into_future().then(move |result| { // Execute if let Ok(ref response) = result { diff --git a/rpc/src/v1/impls/signing.rs b/rpc/src/v1/impls/signing.rs index 9dae40730..5c52df79a 100644 --- a/rpc/src/v1/impls/signing.rs +++ b/rpc/src/v1/impls/signing.rs @@ -95,7 +95,7 @@ impl SigningQueueClient { .and_then(move |payload| { let sender = payload.sender(); if accounts.is_unlocked(sender) { - dispatch::execute(dispatcher, &accounts, payload, dispatch::SignWith::Nothing) + dispatch::execute(dispatcher, accounts, payload, dispatch::SignWith::Nothing) .map(|v| v.into_value()) .map(DispatchResult::Value) .boxed() diff --git a/rpc/src/v1/impls/signing_unsafe.rs b/rpc/src/v1/impls/signing_unsafe.rs index b4900f7ec..1d778404c 100644 --- a/rpc/src/v1/impls/signing_unsafe.rs +++ b/rpc/src/v1/impls/signing_unsafe.rs @@ -61,7 +61,7 @@ impl SigningUnsafeClient { let dis = self.dispatcher.clone(); dispatch::from_rpc(payload, default, &dis) .and_then(move |payload| { - dispatch::execute(dis, &accounts, payload, dispatch::SignWith::Nothing) + dispatch::execute(dis, accounts, payload, dispatch::SignWith::Nothing) }) .map(|v| v.into_value()) .boxed() diff --git a/sync/src/api.rs b/sync/src/api.rs index 5b97bc566..9b1ace73b 100644 --- a/sync/src/api.rs +++ b/sync/src/api.rs @@ -34,7 +34,7 @@ use ipc::{BinaryConvertable, BinaryConvertError, IpcConfig}; use std::str::FromStr; use parking_lot::RwLock; use chain::{ETH_PACKET_COUNT, SNAPSHOT_SYNC_PACKET_COUNT}; -use light::client::LightChainClient; +use light::client::AsLightClient; use light::Provider; use light::net::{self as light_net, LightProtocol, Params as LightParams, Capabilities, Handler as LightHandler, EventContext}; @@ -642,7 +642,7 @@ pub struct LightSync { impl LightSync { /// Create a new light sync service. pub fn new(params: LightSyncParams) -> Result - where L: LightChainClient + Provider + 'static + where L: AsLightClient + Provider + Sync + Send + 'static { use light_sync::LightSync as SyncHandler; diff --git a/sync/src/light_sync/mod.rs b/sync/src/light_sync/mod.rs index 685cb24be..fba89dd7b 100644 --- a/sync/src/light_sync/mod.rs +++ b/sync/src/light_sync/mod.rs @@ -36,7 +36,7 @@ use std::collections::HashMap; use std::mem; use std::sync::Arc; -use light::client::LightChainClient; +use light::client::{AsLightClient, LightChainClient}; use light::net::{ Announcement, Handler, BasicContext, EventContext, Capabilities, ReqId, Status, @@ -106,8 +106,9 @@ impl AncestorSearch { } fn process_response(self, ctx: &ResponseContext, client: &L) -> AncestorSearch - where L: LightChainClient + where L: AsLightClient { + let client = client.as_light_client(); let first_num = client.chain_info().first_block_number.unwrap_or(0); match self { AncestorSearch::Awaiting(id, start, req) => { @@ -203,7 +204,7 @@ impl<'a> ResponseContext for ResponseCtx<'a> { } /// Light client synchronization manager. See module docs for more details. -pub struct LightSync { +pub struct LightSync { best_seen: Mutex>, // best seen block on the network. peers: RwLock>>, // peers which are relevant to synchronization. client: Arc, @@ -211,7 +212,7 @@ pub struct LightSync { state: Mutex, } -impl Handler for LightSync { +impl Handler for LightSync { fn on_connect(&self, ctx: &EventContext, status: &Status, capabilities: &Capabilities) { if !capabilities.serve_headers { trace!(target: "sync", "Disconnecting irrelevant peer: {}", ctx.peer()); @@ -344,7 +345,7 @@ impl Handler for LightSync { } // private helpers -impl LightSync { +impl LightSync { // Begins a search for the common ancestor and our best block. // does not lock state, instead has a mutable reference to it passed. fn begin_search(&self, state: &mut SyncState) { @@ -354,8 +355,8 @@ impl LightSync { return; } - self.client.flush_queue(); - let chain_info = self.client.chain_info(); + self.client.as_light_client().flush_queue(); + let chain_info = self.client.as_light_client().chain_info(); trace!(target: "sync", "Beginning search for common ancestor from {:?}", (chain_info.best_block_number, chain_info.best_block_hash)); @@ -366,8 +367,10 @@ impl LightSync { fn maintain_sync(&self, ctx: &BasicContext) { const DRAIN_AMOUNT: usize = 128; + let client = self.client.as_light_client(); + let chain_info = client.chain_info(); + let mut state = self.state.lock(); - let chain_info = self.client.chain_info(); debug!(target: "sync", "Maintaining sync ({:?})", &*state); // drain any pending blocks into the queue. @@ -376,7 +379,7 @@ impl LightSync { 'a: loop { - if self.client.queue_info().is_full() { break } + if client.queue_info().is_full() { break } *state = match mem::replace(&mut *state, SyncState::Idle) { SyncState::Rounds(round) @@ -388,7 +391,7 @@ impl LightSync { trace!(target: "sync", "Drained {} headers to import", sink.len()); for header in sink.drain(..) { - if let Err(e) = self.client.queue_header(header) { + if let Err(e) = client.queue_header(header) { debug!(target: "sync", "Found bad header ({:?}). Reset to search state.", e); self.begin_search(&mut state); @@ -492,7 +495,7 @@ impl LightSync { } // public API -impl LightSync { +impl LightSync { /// Create a new instance of `LightSync`. /// /// This won't do anything until registered as a handler