From 9592ccc0df66435816339d7103f3b9cbf3b47e3d Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Mon, 28 Mar 2016 18:11:00 +0200 Subject: [PATCH 1/2] Publish locally-made transactions to peers. --- rpc/src/v1/impls/eth.rs | 84 +++++++++++------------ rpc/src/v1/tests/helpers/sync_provider.rs | 5 +- sync/src/chain.rs | 48 +++++++++++++ sync/src/lib.rs | 10 ++- 4 files changed, 100 insertions(+), 47 deletions(-) diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index 141593451..0fd3c95b4 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -184,6 +184,29 @@ impl EthClient data: request.data.map_or_else(Vec::new, |d| d.to_vec()) }.fake_sign(from) } + + fn dispatch_transaction(&self, signed_transaction: SignedTransaction, raw_transaction: Vec) -> Result { + let hash = signed_transaction.hash(); + + let import = { + let client = take_weak!(self.client); + take_weak!(self.miner).import_transactions(vec![signed_transaction], |a: &Address| AccountDetails { + nonce: client.nonce(a), + balance: client.balance(a), + }) + }; + + match import.into_iter().collect::, _>>() { + Ok(_) => { + take_weak!(self.sync).new_transaction(raw_transaction); + to_value(&hash) + } + Err(e) => { + warn!("Error sending transaction: {:?}", e); + to_value(&H256::zero()) + } + } + } } const MAX_QUEUE_SIZE_TO_MINE_ON: usize = 4; // because uncles go back 6. @@ -460,32 +483,19 @@ impl Eth for EthClient let accounts = take_weak!(self.accounts); match accounts.account_secret(&request.from) { Ok(secret) => { - let miner = take_weak!(self.miner); - let client = take_weak!(self.client); - - let transaction = EthTransaction { - nonce: request.nonce.unwrap_or_else(|| client.nonce(&request.from)), - action: request.to.map_or(Action::Create, Action::Call), - gas: request.gas.unwrap_or_else(default_gas), - gas_price: request.gas_price.unwrap_or_else(default_gas_price), - value: request.value.unwrap_or_else(U256::zero), - data: request.data.map_or_else(Vec::new, |d| d.to_vec()) + let signed_transaction = { + let client = take_weak!(self.client); + EthTransaction { + nonce: request.nonce.unwrap_or_else(|| client.nonce(&request.from)), + action: request.to.map_or(Action::Create, Action::Call), + gas: request.gas.unwrap_or_else(default_gas), + gas_price: request.gas_price.unwrap_or_else(default_gas_price), + value: request.value.unwrap_or_else(U256::zero), + data: request.data.map_or_else(Vec::new, |d| d.to_vec()), + }.sign(&secret) }; - - let signed_transaction = transaction.sign(&secret); - let hash = signed_transaction.hash(); - - let import = miner.import_transactions(vec![signed_transaction], |a: &Address| AccountDetails { - nonce: client.nonce(a), - balance: client.balance(a), - }); - match import.into_iter().collect::, _>>() { - Ok(_) => to_value(&hash), - Err(e) => { - warn!("Error sending transaction: {:?}", e); - to_value(&H256::zero()) - } - } + let raw_transaction = encode(&signed_transaction).to_vec(); + self.dispatch_transaction(signed_transaction, raw_transaction) }, Err(_) => { to_value(&H256::zero()) } } @@ -495,26 +505,10 @@ impl Eth for EthClient fn send_raw_transaction(&self, params: Params) -> Result { from_params::<(Bytes, )>(params) .and_then(|(raw_transaction, )| { - let decoded: Result = UntrustedRlp::new(&raw_transaction.to_vec()).as_val(); - match decoded { - Ok(signed_tx) => { - let miner = take_weak!(self.miner); - let client = take_weak!(self.client); - - let hash = signed_tx.hash(); - let import = miner.import_transactions(vec![signed_tx], |a: &Address| AccountDetails { - nonce: client.nonce(a), - balance: client.balance(a), - }); - match import.into_iter().collect::, _>>() { - Ok(_) => to_value(&hash), - Err(e) => { - warn!("Error sending transaction: {:?}", e); - to_value(&H256::zero()) - } - } - }, - Err(_) => { to_value(&H256::zero()) } + let raw_transaction = raw_transaction.to_vec(); + match UntrustedRlp::new(&raw_transaction).as_val() { + Ok(signed_transaction) => self.dispatch_transaction(signed_transaction, raw_transaction), + Err(_) => to_value(&H256::zero()), } }) } diff --git a/rpc/src/v1/tests/helpers/sync_provider.rs b/rpc/src/v1/tests/helpers/sync_provider.rs index 527974140..59188f0a7 100644 --- a/rpc/src/v1/tests/helpers/sync_provider.rs +++ b/rpc/src/v1/tests/helpers/sync_provider.rs @@ -16,7 +16,7 @@ //! Test implementation of SyncProvider. -use util::U256; +use util::{U256, Bytes}; use ethsync::{SyncProvider, SyncStatus, SyncState}; use std::sync::{RwLock}; @@ -59,5 +59,8 @@ impl SyncProvider for TestSyncProvider { fn status(&self) -> SyncStatus { self.status.read().unwrap().clone() } + + fn new_transaction(&self, _raw_transaction: Bytes) { + } } diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 41afbea1c..9194bf1d9 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -217,6 +217,10 @@ pub struct ChainSync { network_id: U256, /// Miner miner: Arc, + + /// Transactions to propagate + // TODO: reconsider where this is in the codebase - seems a little dodgy to have here. + transactions_to_send: Vec, } type RlpResponseResult = Result, PacketDecodeError>; @@ -243,6 +247,7 @@ impl ChainSync { max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks), network_id: config.network_id, miner: miner, + transactions_to_send: vec![], } } @@ -938,6 +943,12 @@ impl ChainSync { sync.disable_peer(peer_id); } } + + /// Place a new transaction on the wire. + pub fn new_transaction(&mut self, raw_transaction: Bytes) { + self.transactions_to_send.push(raw_transaction); + } + /// Called when peer sends us new transactions fn on_peer_transactions(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { // accepting transactions once only fully synced @@ -1271,7 +1282,44 @@ impl ChainSync { sent } + /// propagates new transactions to all peers + fn propagate_new_transactions(&mut self, io: &mut SyncIo) -> usize { + + // Early out of nobody to send to. + if self.peers.len() == 0 { + return 0; + } + + let mut packet = RlpStream::new_list(self.transactions_to_send.len()); + for tx in self.transactions_to_send.iter() { + packet.append_raw(tx, 1); + } + self.transactions_to_send.clear(); + let rlp = packet.out(); + + let lucky_peers = { + // sqrt(x)/x scaled to max u32 + let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32; + let small = self.peers.len() < MIN_PEERS_PROPAGATION; + let lucky_peers = self.peers.iter() + .filter_map(|(&p, _)| if small || ::rand::random::() < fraction { Some(p.clone()) } else { None }) + .collect::>(); + + // taking at max of MAX_PEERS_PROPAGATION + lucky_peers.iter().map(|&id| id.clone()).take(min(lucky_peers.len(), MAX_PEERS_PROPAGATION)).collect::>() + }; + + let sent = lucky_peers.len(); + for peer_id in lucky_peers { + self.send_packet(io, peer_id, TRANSACTIONS_PACKET, rlp.clone()); + } + sent + } + fn propagate_latest_blocks(&mut self, io: &mut SyncIo) { + if !self.transactions_to_send.is_empty() { + self.propagate_new_transactions(io); + } let chain_info = io.chain().chain_info(); if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION { let blocks = self.propagate_blocks(&chain_info, io); diff --git a/sync/src/lib.rs b/sync/src/lib.rs index a4f6eff38..ea4a1daea 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -66,7 +66,7 @@ use std::ops::*; use std::sync::*; use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId}; use util::TimerToken; -use util::{U256, ONE_U256}; +use util::{U256, Bytes, ONE_U256}; use ethcore::client::Client; use ethcore::service::SyncMessage; use ethminer::Miner; @@ -101,6 +101,9 @@ impl Default for SyncConfig { pub trait SyncProvider: Send + Sync { /// Get sync status fn status(&self) -> SyncStatus; + + /// Note that a user has submitted a new transaction. + fn new_transaction(&self, raw_transaction: Bytes); } /// Ethereum network protocol handler @@ -140,6 +143,11 @@ impl SyncProvider for EthSync { fn status(&self) -> SyncStatus { self.sync.read().unwrap().status() } + + /// Note that a user has submitted a new transaction. + fn new_transaction(&self, raw_transaction: Bytes) { + self.sync.write().unwrap().new_transaction(raw_transaction); + } } impl NetworkProtocolHandler for EthSync { From 5d626c7dd3bc3710f1c7869f844e5252ed1b8ffd Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Mon, 28 Mar 2016 18:53:33 +0200 Subject: [PATCH 2/2] Use sensible gas price. --- miner/src/lib.rs | 5 ++++- miner/src/miner.rs | 5 +++++ miner/src/transaction_queue.rs | 5 +++++ rpc/src/v1/impls/eth.rs | 24 +++++++++++------------- 4 files changed, 25 insertions(+), 14 deletions(-) diff --git a/miner/src/lib.rs b/miner/src/lib.rs index c7a02bdc4..0daac48e6 100644 --- a/miner/src/lib.rs +++ b/miner/src/lib.rs @@ -64,7 +64,7 @@ mod transaction_queue; pub use transaction_queue::{TransactionQueue, AccountDetails}; pub use miner::{Miner}; -use util::{H256, Address, FixedHash, Bytes}; +use util::{H256, U256, Address, FixedHash, Bytes}; use ethcore::client::{BlockChainClient}; use ethcore::block::{ClosedBlock}; use ethcore::error::{Error}; @@ -107,6 +107,9 @@ pub trait MinerService : Send + Sync { /// Query pending transactions for hash fn transaction(&self, hash: &H256) -> Option; + + /// Suggested gas price + fn sensible_gas_price(&self) -> U256 { x!(20000000000u64) } } /// Mining status diff --git a/miner/src/miner.rs b/miner/src/miner.rs index bf38d6a42..70bf1711a 100644 --- a/miner/src/miner.rs +++ b/miner/src/miner.rs @@ -198,6 +198,11 @@ impl MinerService for Miner { } } + fn sensible_gas_price(&self) -> U256 { + // 10% above our minimum. + self.transaction_queue.lock().unwrap().minimal_gas_price().clone() * x!(110) / x!(100) + } + fn author(&self) -> Address { *self.author.read().unwrap() } diff --git a/miner/src/transaction_queue.rs b/miner/src/transaction_queue.rs index 8f2855836..b6dd9c829 100644 --- a/miner/src/transaction_queue.rs +++ b/miner/src/transaction_queue.rs @@ -312,6 +312,11 @@ impl TransactionQueue { } } + /// Get the minimal gas price. + pub fn minimal_gas_price(&self) -> &U256 { + &self.minimal_gas_price + } + /// Sets new gas price threshold for incoming transactions. /// Any transaction already imported to the queue is not affected. pub fn set_minimal_gas_price(&mut self, min_gas_price: U256) { diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index 0fd3c95b4..dd241c9ec 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -31,7 +31,6 @@ use ethcore::client::*; use ethcore::block::IsBlock; use ethcore::views::*; use ethcore::ethereum::Ethash; -use ethcore::ethereum::denominations::shannon; use ethcore::transaction::{Transaction as EthTransaction, SignedTransaction, Action}; use self::ethash::SeedHashCompute; use v1::traits::{Eth, EthFilter}; @@ -44,10 +43,6 @@ fn default_gas() -> U256 { U256::from(21_000) } -fn default_gas_price() -> U256 { - shannon() * U256::from(20) -} - /// Eth rpc implementation. pub struct EthClient where C: BlockChainClient, @@ -173,16 +168,18 @@ impl EthClient } } - fn sign_call(client: &Arc, request: CallRequest) -> SignedTransaction { + fn sign_call(&self, request: CallRequest) -> Result { + let client = take_weak!(self.client); + let miner = take_weak!(self.miner); let from = request.from.unwrap_or(Address::zero()); - EthTransaction { + Ok(EthTransaction { nonce: request.nonce.unwrap_or_else(|| client.nonce(&from)), action: request.to.map_or(Action::Create, Action::Call), gas: request.gas.unwrap_or_else(default_gas), - gas_price: request.gas_price.unwrap_or_else(default_gas_price), + gas_price: request.gas_price.unwrap_or_else(|| miner.sensible_gas_price()), value: request.value.unwrap_or_else(U256::zero), data: request.data.map_or_else(Vec::new, |d| d.to_vec()) - }.fake_sign(from) + }.fake_sign(from)) } fn dispatch_transaction(&self, signed_transaction: SignedTransaction, raw_transaction: Vec) -> Result { @@ -290,7 +287,7 @@ impl Eth for EthClient fn gas_price(&self, params: Params) -> Result { match params { - Params::None => to_value(&default_gas_price()), + Params::None => to_value(&take_weak!(self.miner).sensible_gas_price()), _ => Err(Error::invalid_params()) } } @@ -485,11 +482,12 @@ impl Eth for EthClient Ok(secret) => { let signed_transaction = { let client = take_weak!(self.client); + let miner = take_weak!(self.miner); EthTransaction { nonce: request.nonce.unwrap_or_else(|| client.nonce(&request.from)), action: request.to.map_or(Action::Create, Action::Call), gas: request.gas.unwrap_or_else(default_gas), - gas_price: request.gas_price.unwrap_or_else(default_gas_price), + gas_price: request.gas_price.unwrap_or_else(|| miner.sensible_gas_price()), value: request.value.unwrap_or_else(U256::zero), data: request.data.map_or_else(Vec::new, |d| d.to_vec()), }.sign(&secret) @@ -515,8 +513,8 @@ impl Eth for EthClient fn call(&self, params: Params) -> Result { from_params_discard_second(params).and_then(|(request, )| { + let signed = try!(self.sign_call(request)); let client = take_weak!(self.client); - let signed = Self::sign_call(&client, request); let output = client.call(&signed).map(|e| Bytes(e.output)).unwrap_or(Bytes::new(vec![])); to_value(&output) }) @@ -524,8 +522,8 @@ impl Eth for EthClient fn estimate_gas(&self, params: Params) -> Result { from_params_discard_second(params).and_then(|(request, )| { + let signed = try!(self.sign_call(request)); let client = take_weak!(self.client); - let signed = Self::sign_call(&client, request); let used = client.call(&signed).map(|res| res.gas_used + res.refunded).unwrap_or(From::from(0)); to_value(&used) })