Merge pull request #850 from ethcore/pubtxs
Publish locally-made transactions to peers.
This commit is contained in:
commit
6399c0a2c7
@ -64,7 +64,7 @@ mod transaction_queue;
|
|||||||
pub use transaction_queue::{TransactionQueue, AccountDetails};
|
pub use transaction_queue::{TransactionQueue, AccountDetails};
|
||||||
pub use miner::{Miner};
|
pub use miner::{Miner};
|
||||||
|
|
||||||
use util::{H256, Address, FixedHash, Bytes};
|
use util::{H256, U256, Address, FixedHash, Bytes};
|
||||||
use ethcore::client::{BlockChainClient};
|
use ethcore::client::{BlockChainClient};
|
||||||
use ethcore::block::{ClosedBlock};
|
use ethcore::block::{ClosedBlock};
|
||||||
use ethcore::error::{Error};
|
use ethcore::error::{Error};
|
||||||
@ -107,6 +107,9 @@ pub trait MinerService : Send + Sync {
|
|||||||
|
|
||||||
/// Query pending transactions for hash
|
/// Query pending transactions for hash
|
||||||
fn transaction(&self, hash: &H256) -> Option<SignedTransaction>;
|
fn transaction(&self, hash: &H256) -> Option<SignedTransaction>;
|
||||||
|
|
||||||
|
/// Suggested gas price
|
||||||
|
fn sensible_gas_price(&self) -> U256 { x!(20000000000u64) }
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Mining status
|
/// Mining status
|
||||||
|
@ -198,6 +198,11 @@ impl MinerService for Miner {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn sensible_gas_price(&self) -> U256 {
|
||||||
|
// 10% above our minimum.
|
||||||
|
self.transaction_queue.lock().unwrap().minimal_gas_price().clone() * x!(110) / x!(100)
|
||||||
|
}
|
||||||
|
|
||||||
fn author(&self) -> Address {
|
fn author(&self) -> Address {
|
||||||
*self.author.read().unwrap()
|
*self.author.read().unwrap()
|
||||||
}
|
}
|
||||||
|
@ -312,6 +312,11 @@ impl TransactionQueue {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get the minimal gas price.
|
||||||
|
pub fn minimal_gas_price(&self) -> &U256 {
|
||||||
|
&self.minimal_gas_price
|
||||||
|
}
|
||||||
|
|
||||||
/// Sets new gas price threshold for incoming transactions.
|
/// Sets new gas price threshold for incoming transactions.
|
||||||
/// Any transaction already imported to the queue is not affected.
|
/// Any transaction already imported to the queue is not affected.
|
||||||
pub fn set_minimal_gas_price(&mut self, min_gas_price: U256) {
|
pub fn set_minimal_gas_price(&mut self, min_gas_price: U256) {
|
||||||
|
@ -31,7 +31,6 @@ use ethcore::client::*;
|
|||||||
use ethcore::block::IsBlock;
|
use ethcore::block::IsBlock;
|
||||||
use ethcore::views::*;
|
use ethcore::views::*;
|
||||||
use ethcore::ethereum::Ethash;
|
use ethcore::ethereum::Ethash;
|
||||||
use ethcore::ethereum::denominations::shannon;
|
|
||||||
use ethcore::transaction::{Transaction as EthTransaction, SignedTransaction, Action};
|
use ethcore::transaction::{Transaction as EthTransaction, SignedTransaction, Action};
|
||||||
use self::ethash::SeedHashCompute;
|
use self::ethash::SeedHashCompute;
|
||||||
use v1::traits::{Eth, EthFilter};
|
use v1::traits::{Eth, EthFilter};
|
||||||
@ -44,10 +43,6 @@ fn default_gas() -> U256 {
|
|||||||
U256::from(21_000)
|
U256::from(21_000)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn default_gas_price() -> U256 {
|
|
||||||
shannon() * U256::from(20)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Eth rpc implementation.
|
/// Eth rpc implementation.
|
||||||
pub struct EthClient<C, S, A, M, EM = ExternalMiner>
|
pub struct EthClient<C, S, A, M, EM = ExternalMiner>
|
||||||
where C: BlockChainClient,
|
where C: BlockChainClient,
|
||||||
@ -173,16 +168,41 @@ impl<C, S, A, M, EM> EthClient<C, S, A, M, EM>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn sign_call(client: &Arc<C>, request: CallRequest) -> SignedTransaction {
|
fn sign_call(&self, request: CallRequest) -> Result<SignedTransaction, Error> {
|
||||||
|
let client = take_weak!(self.client);
|
||||||
|
let miner = take_weak!(self.miner);
|
||||||
let from = request.from.unwrap_or(Address::zero());
|
let from = request.from.unwrap_or(Address::zero());
|
||||||
EthTransaction {
|
Ok(EthTransaction {
|
||||||
nonce: request.nonce.unwrap_or_else(|| client.nonce(&from)),
|
nonce: request.nonce.unwrap_or_else(|| client.nonce(&from)),
|
||||||
action: request.to.map_or(Action::Create, Action::Call),
|
action: request.to.map_or(Action::Create, Action::Call),
|
||||||
gas: request.gas.unwrap_or_else(default_gas),
|
gas: request.gas.unwrap_or_else(default_gas),
|
||||||
gas_price: request.gas_price.unwrap_or_else(default_gas_price),
|
gas_price: request.gas_price.unwrap_or_else(|| miner.sensible_gas_price()),
|
||||||
value: request.value.unwrap_or_else(U256::zero),
|
value: request.value.unwrap_or_else(U256::zero),
|
||||||
data: request.data.map_or_else(Vec::new, |d| d.to_vec())
|
data: request.data.map_or_else(Vec::new, |d| d.to_vec())
|
||||||
}.fake_sign(from)
|
}.fake_sign(from))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn dispatch_transaction(&self, signed_transaction: SignedTransaction, raw_transaction: Vec<u8>) -> Result<Value, Error> {
|
||||||
|
let hash = signed_transaction.hash();
|
||||||
|
|
||||||
|
let import = {
|
||||||
|
let client = take_weak!(self.client);
|
||||||
|
take_weak!(self.miner).import_transactions(vec![signed_transaction], |a: &Address| AccountDetails {
|
||||||
|
nonce: client.nonce(a),
|
||||||
|
balance: client.balance(a),
|
||||||
|
})
|
||||||
|
};
|
||||||
|
|
||||||
|
match import.into_iter().collect::<Result<Vec<_>, _>>() {
|
||||||
|
Ok(_) => {
|
||||||
|
take_weak!(self.sync).new_transaction(raw_transaction);
|
||||||
|
to_value(&hash)
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Error sending transaction: {:?}", e);
|
||||||
|
to_value(&H256::zero())
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -267,7 +287,7 @@ impl<C, S, A, M, EM> Eth for EthClient<C, S, A, M, EM>
|
|||||||
|
|
||||||
fn gas_price(&self, params: Params) -> Result<Value, Error> {
|
fn gas_price(&self, params: Params) -> Result<Value, Error> {
|
||||||
match params {
|
match params {
|
||||||
Params::None => to_value(&default_gas_price()),
|
Params::None => to_value(&take_weak!(self.miner).sensible_gas_price()),
|
||||||
_ => Err(Error::invalid_params())
|
_ => Err(Error::invalid_params())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -460,32 +480,20 @@ impl<C, S, A, M, EM> Eth for EthClient<C, S, A, M, EM>
|
|||||||
let accounts = take_weak!(self.accounts);
|
let accounts = take_weak!(self.accounts);
|
||||||
match accounts.account_secret(&request.from) {
|
match accounts.account_secret(&request.from) {
|
||||||
Ok(secret) => {
|
Ok(secret) => {
|
||||||
let miner = take_weak!(self.miner);
|
let signed_transaction = {
|
||||||
let client = take_weak!(self.client);
|
let client = take_weak!(self.client);
|
||||||
|
let miner = take_weak!(self.miner);
|
||||||
let transaction = EthTransaction {
|
EthTransaction {
|
||||||
nonce: request.nonce.unwrap_or_else(|| client.nonce(&request.from)),
|
nonce: request.nonce.unwrap_or_else(|| client.nonce(&request.from)),
|
||||||
action: request.to.map_or(Action::Create, Action::Call),
|
action: request.to.map_or(Action::Create, Action::Call),
|
||||||
gas: request.gas.unwrap_or_else(default_gas),
|
gas: request.gas.unwrap_or_else(default_gas),
|
||||||
gas_price: request.gas_price.unwrap_or_else(default_gas_price),
|
gas_price: request.gas_price.unwrap_or_else(|| miner.sensible_gas_price()),
|
||||||
value: request.value.unwrap_or_else(U256::zero),
|
value: request.value.unwrap_or_else(U256::zero),
|
||||||
data: request.data.map_or_else(Vec::new, |d| d.to_vec())
|
data: request.data.map_or_else(Vec::new, |d| d.to_vec()),
|
||||||
|
}.sign(&secret)
|
||||||
};
|
};
|
||||||
|
let raw_transaction = encode(&signed_transaction).to_vec();
|
||||||
let signed_transaction = transaction.sign(&secret);
|
self.dispatch_transaction(signed_transaction, raw_transaction)
|
||||||
let hash = signed_transaction.hash();
|
|
||||||
|
|
||||||
let import = miner.import_transactions(vec![signed_transaction], |a: &Address| AccountDetails {
|
|
||||||
nonce: client.nonce(a),
|
|
||||||
balance: client.balance(a),
|
|
||||||
});
|
|
||||||
match import.into_iter().collect::<Result<Vec<_>, _>>() {
|
|
||||||
Ok(_) => to_value(&hash),
|
|
||||||
Err(e) => {
|
|
||||||
warn!("Error sending transaction: {:?}", e);
|
|
||||||
to_value(&H256::zero())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
Err(_) => { to_value(&H256::zero()) }
|
Err(_) => { to_value(&H256::zero()) }
|
||||||
}
|
}
|
||||||
@ -495,34 +503,18 @@ impl<C, S, A, M, EM> Eth for EthClient<C, S, A, M, EM>
|
|||||||
fn send_raw_transaction(&self, params: Params) -> Result<Value, Error> {
|
fn send_raw_transaction(&self, params: Params) -> Result<Value, Error> {
|
||||||
from_params::<(Bytes, )>(params)
|
from_params::<(Bytes, )>(params)
|
||||||
.and_then(|(raw_transaction, )| {
|
.and_then(|(raw_transaction, )| {
|
||||||
let decoded: Result<SignedTransaction, _> = UntrustedRlp::new(&raw_transaction.to_vec()).as_val();
|
let raw_transaction = raw_transaction.to_vec();
|
||||||
match decoded {
|
match UntrustedRlp::new(&raw_transaction).as_val() {
|
||||||
Ok(signed_tx) => {
|
Ok(signed_transaction) => self.dispatch_transaction(signed_transaction, raw_transaction),
|
||||||
let miner = take_weak!(self.miner);
|
Err(_) => to_value(&H256::zero()),
|
||||||
let client = take_weak!(self.client);
|
|
||||||
|
|
||||||
let hash = signed_tx.hash();
|
|
||||||
let import = miner.import_transactions(vec![signed_tx], |a: &Address| AccountDetails {
|
|
||||||
nonce: client.nonce(a),
|
|
||||||
balance: client.balance(a),
|
|
||||||
});
|
|
||||||
match import.into_iter().collect::<Result<Vec<_>, _>>() {
|
|
||||||
Ok(_) => to_value(&hash),
|
|
||||||
Err(e) => {
|
|
||||||
warn!("Error sending transaction: {:?}", e);
|
|
||||||
to_value(&H256::zero())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(_) => { to_value(&H256::zero()) }
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn call(&self, params: Params) -> Result<Value, Error> {
|
fn call(&self, params: Params) -> Result<Value, Error> {
|
||||||
from_params_discard_second(params).and_then(|(request, )| {
|
from_params_discard_second(params).and_then(|(request, )| {
|
||||||
|
let signed = try!(self.sign_call(request));
|
||||||
let client = take_weak!(self.client);
|
let client = take_weak!(self.client);
|
||||||
let signed = Self::sign_call(&client, request);
|
|
||||||
let output = client.call(&signed).map(|e| Bytes(e.output)).unwrap_or(Bytes::new(vec![]));
|
let output = client.call(&signed).map(|e| Bytes(e.output)).unwrap_or(Bytes::new(vec![]));
|
||||||
to_value(&output)
|
to_value(&output)
|
||||||
})
|
})
|
||||||
@ -530,8 +522,8 @@ impl<C, S, A, M, EM> Eth for EthClient<C, S, A, M, EM>
|
|||||||
|
|
||||||
fn estimate_gas(&self, params: Params) -> Result<Value, Error> {
|
fn estimate_gas(&self, params: Params) -> Result<Value, Error> {
|
||||||
from_params_discard_second(params).and_then(|(request, )| {
|
from_params_discard_second(params).and_then(|(request, )| {
|
||||||
|
let signed = try!(self.sign_call(request));
|
||||||
let client = take_weak!(self.client);
|
let client = take_weak!(self.client);
|
||||||
let signed = Self::sign_call(&client, request);
|
|
||||||
let used = client.call(&signed).map(|res| res.gas_used + res.refunded).unwrap_or(From::from(0));
|
let used = client.call(&signed).map(|res| res.gas_used + res.refunded).unwrap_or(From::from(0));
|
||||||
to_value(&used)
|
to_value(&used)
|
||||||
})
|
})
|
||||||
|
@ -16,7 +16,7 @@
|
|||||||
|
|
||||||
//! Test implementation of SyncProvider.
|
//! Test implementation of SyncProvider.
|
||||||
|
|
||||||
use util::U256;
|
use util::{U256, Bytes};
|
||||||
use ethsync::{SyncProvider, SyncStatus, SyncState};
|
use ethsync::{SyncProvider, SyncStatus, SyncState};
|
||||||
use std::sync::{RwLock};
|
use std::sync::{RwLock};
|
||||||
|
|
||||||
@ -59,5 +59,8 @@ impl SyncProvider for TestSyncProvider {
|
|||||||
fn status(&self) -> SyncStatus {
|
fn status(&self) -> SyncStatus {
|
||||||
self.status.read().unwrap().clone()
|
self.status.read().unwrap().clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn new_transaction(&self, _raw_transaction: Bytes) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -217,6 +217,10 @@ pub struct ChainSync {
|
|||||||
network_id: U256,
|
network_id: U256,
|
||||||
/// Miner
|
/// Miner
|
||||||
miner: Arc<Miner>,
|
miner: Arc<Miner>,
|
||||||
|
|
||||||
|
/// Transactions to propagate
|
||||||
|
// TODO: reconsider where this is in the codebase - seems a little dodgy to have here.
|
||||||
|
transactions_to_send: Vec<Bytes>,
|
||||||
}
|
}
|
||||||
|
|
||||||
type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
|
type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
|
||||||
@ -243,6 +247,7 @@ impl ChainSync {
|
|||||||
max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks),
|
max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks),
|
||||||
network_id: config.network_id,
|
network_id: config.network_id,
|
||||||
miner: miner,
|
miner: miner,
|
||||||
|
transactions_to_send: vec![],
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -938,6 +943,12 @@ impl ChainSync {
|
|||||||
sync.disable_peer(peer_id);
|
sync.disable_peer(peer_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Place a new transaction on the wire.
|
||||||
|
pub fn new_transaction(&mut self, raw_transaction: Bytes) {
|
||||||
|
self.transactions_to_send.push(raw_transaction);
|
||||||
|
}
|
||||||
|
|
||||||
/// Called when peer sends us new transactions
|
/// Called when peer sends us new transactions
|
||||||
fn on_peer_transactions(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
|
fn on_peer_transactions(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
|
||||||
// accepting transactions once only fully synced
|
// accepting transactions once only fully synced
|
||||||
@ -1271,7 +1282,44 @@ impl ChainSync {
|
|||||||
sent
|
sent
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// propagates new transactions to all peers
|
||||||
|
fn propagate_new_transactions(&mut self, io: &mut SyncIo) -> usize {
|
||||||
|
|
||||||
|
// Early out of nobody to send to.
|
||||||
|
if self.peers.len() == 0 {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut packet = RlpStream::new_list(self.transactions_to_send.len());
|
||||||
|
for tx in self.transactions_to_send.iter() {
|
||||||
|
packet.append_raw(tx, 1);
|
||||||
|
}
|
||||||
|
self.transactions_to_send.clear();
|
||||||
|
let rlp = packet.out();
|
||||||
|
|
||||||
|
let lucky_peers = {
|
||||||
|
// sqrt(x)/x scaled to max u32
|
||||||
|
let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32;
|
||||||
|
let small = self.peers.len() < MIN_PEERS_PROPAGATION;
|
||||||
|
let lucky_peers = self.peers.iter()
|
||||||
|
.filter_map(|(&p, _)| if small || ::rand::random::<u32>() < fraction { Some(p.clone()) } else { None })
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
// taking at max of MAX_PEERS_PROPAGATION
|
||||||
|
lucky_peers.iter().map(|&id| id.clone()).take(min(lucky_peers.len(), MAX_PEERS_PROPAGATION)).collect::<Vec<PeerId>>()
|
||||||
|
};
|
||||||
|
|
||||||
|
let sent = lucky_peers.len();
|
||||||
|
for peer_id in lucky_peers {
|
||||||
|
self.send_packet(io, peer_id, TRANSACTIONS_PACKET, rlp.clone());
|
||||||
|
}
|
||||||
|
sent
|
||||||
|
}
|
||||||
|
|
||||||
fn propagate_latest_blocks(&mut self, io: &mut SyncIo) {
|
fn propagate_latest_blocks(&mut self, io: &mut SyncIo) {
|
||||||
|
if !self.transactions_to_send.is_empty() {
|
||||||
|
self.propagate_new_transactions(io);
|
||||||
|
}
|
||||||
let chain_info = io.chain().chain_info();
|
let chain_info = io.chain().chain_info();
|
||||||
if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION {
|
if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION {
|
||||||
let blocks = self.propagate_blocks(&chain_info, io);
|
let blocks = self.propagate_blocks(&chain_info, io);
|
||||||
|
@ -66,7 +66,7 @@ use std::ops::*;
|
|||||||
use std::sync::*;
|
use std::sync::*;
|
||||||
use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId};
|
use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId};
|
||||||
use util::TimerToken;
|
use util::TimerToken;
|
||||||
use util::{U256, ONE_U256};
|
use util::{U256, Bytes, ONE_U256};
|
||||||
use ethcore::client::Client;
|
use ethcore::client::Client;
|
||||||
use ethcore::service::SyncMessage;
|
use ethcore::service::SyncMessage;
|
||||||
use ethminer::Miner;
|
use ethminer::Miner;
|
||||||
@ -101,6 +101,9 @@ impl Default for SyncConfig {
|
|||||||
pub trait SyncProvider: Send + Sync {
|
pub trait SyncProvider: Send + Sync {
|
||||||
/// Get sync status
|
/// Get sync status
|
||||||
fn status(&self) -> SyncStatus;
|
fn status(&self) -> SyncStatus;
|
||||||
|
|
||||||
|
/// Note that a user has submitted a new transaction.
|
||||||
|
fn new_transaction(&self, raw_transaction: Bytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Ethereum network protocol handler
|
/// Ethereum network protocol handler
|
||||||
@ -140,6 +143,11 @@ impl SyncProvider for EthSync {
|
|||||||
fn status(&self) -> SyncStatus {
|
fn status(&self) -> SyncStatus {
|
||||||
self.sync.read().unwrap().status()
|
self.sync.read().unwrap().status()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Note that a user has submitted a new transaction.
|
||||||
|
fn new_transaction(&self, raw_transaction: Bytes) {
|
||||||
|
self.sync.write().unwrap().new_transaction(raw_transaction);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl NetworkProtocolHandler<SyncMessage> for EthSync {
|
impl NetworkProtocolHandler<SyncMessage> for EthSync {
|
||||||
|
Loading…
Reference in New Issue
Block a user