Publish locally-made transactions to peers.

This commit is contained in:
Gav Wood 2016-03-28 18:11:00 +02:00
parent 6da9e19f73
commit 9592ccc0df
4 changed files with 100 additions and 47 deletions

View File

@ -184,6 +184,29 @@ impl<C, S, A, M, EM> EthClient<C, S, A, M, EM>
data: request.data.map_or_else(Vec::new, |d| d.to_vec()) data: request.data.map_or_else(Vec::new, |d| d.to_vec())
}.fake_sign(from) }.fake_sign(from)
} }
fn dispatch_transaction(&self, signed_transaction: SignedTransaction, raw_transaction: Vec<u8>) -> Result<Value, Error> {
let hash = signed_transaction.hash();
let import = {
let client = take_weak!(self.client);
take_weak!(self.miner).import_transactions(vec![signed_transaction], |a: &Address| AccountDetails {
nonce: client.nonce(a),
balance: client.balance(a),
})
};
match import.into_iter().collect::<Result<Vec<_>, _>>() {
Ok(_) => {
take_weak!(self.sync).new_transaction(raw_transaction);
to_value(&hash)
}
Err(e) => {
warn!("Error sending transaction: {:?}", e);
to_value(&H256::zero())
}
}
}
} }
const MAX_QUEUE_SIZE_TO_MINE_ON: usize = 4; // because uncles go back 6. const MAX_QUEUE_SIZE_TO_MINE_ON: usize = 4; // because uncles go back 6.
@ -460,32 +483,19 @@ impl<C, S, A, M, EM> Eth for EthClient<C, S, A, M, EM>
let accounts = take_weak!(self.accounts); let accounts = take_weak!(self.accounts);
match accounts.account_secret(&request.from) { match accounts.account_secret(&request.from) {
Ok(secret) => { Ok(secret) => {
let miner = take_weak!(self.miner); let signed_transaction = {
let client = take_weak!(self.client); let client = take_weak!(self.client);
EthTransaction {
let transaction = EthTransaction { nonce: request.nonce.unwrap_or_else(|| client.nonce(&request.from)),
nonce: request.nonce.unwrap_or_else(|| client.nonce(&request.from)), action: request.to.map_or(Action::Create, Action::Call),
action: request.to.map_or(Action::Create, Action::Call), gas: request.gas.unwrap_or_else(default_gas),
gas: request.gas.unwrap_or_else(default_gas), gas_price: request.gas_price.unwrap_or_else(default_gas_price),
gas_price: request.gas_price.unwrap_or_else(default_gas_price), value: request.value.unwrap_or_else(U256::zero),
value: request.value.unwrap_or_else(U256::zero), data: request.data.map_or_else(Vec::new, |d| d.to_vec()),
data: request.data.map_or_else(Vec::new, |d| d.to_vec()) }.sign(&secret)
}; };
let raw_transaction = encode(&signed_transaction).to_vec();
let signed_transaction = transaction.sign(&secret); self.dispatch_transaction(signed_transaction, raw_transaction)
let hash = signed_transaction.hash();
let import = miner.import_transactions(vec![signed_transaction], |a: &Address| AccountDetails {
nonce: client.nonce(a),
balance: client.balance(a),
});
match import.into_iter().collect::<Result<Vec<_>, _>>() {
Ok(_) => to_value(&hash),
Err(e) => {
warn!("Error sending transaction: {:?}", e);
to_value(&H256::zero())
}
}
}, },
Err(_) => { to_value(&H256::zero()) } Err(_) => { to_value(&H256::zero()) }
} }
@ -495,26 +505,10 @@ impl<C, S, A, M, EM> Eth for EthClient<C, S, A, M, EM>
fn send_raw_transaction(&self, params: Params) -> Result<Value, Error> { fn send_raw_transaction(&self, params: Params) -> Result<Value, Error> {
from_params::<(Bytes, )>(params) from_params::<(Bytes, )>(params)
.and_then(|(raw_transaction, )| { .and_then(|(raw_transaction, )| {
let decoded: Result<SignedTransaction, _> = UntrustedRlp::new(&raw_transaction.to_vec()).as_val(); let raw_transaction = raw_transaction.to_vec();
match decoded { match UntrustedRlp::new(&raw_transaction).as_val() {
Ok(signed_tx) => { Ok(signed_transaction) => self.dispatch_transaction(signed_transaction, raw_transaction),
let miner = take_weak!(self.miner); Err(_) => to_value(&H256::zero()),
let client = take_weak!(self.client);
let hash = signed_tx.hash();
let import = miner.import_transactions(vec![signed_tx], |a: &Address| AccountDetails {
nonce: client.nonce(a),
balance: client.balance(a),
});
match import.into_iter().collect::<Result<Vec<_>, _>>() {
Ok(_) => to_value(&hash),
Err(e) => {
warn!("Error sending transaction: {:?}", e);
to_value(&H256::zero())
}
}
},
Err(_) => { to_value(&H256::zero()) }
} }
}) })
} }

View File

@ -16,7 +16,7 @@
//! Test implementation of SyncProvider. //! Test implementation of SyncProvider.
use util::U256; use util::{U256, Bytes};
use ethsync::{SyncProvider, SyncStatus, SyncState}; use ethsync::{SyncProvider, SyncStatus, SyncState};
use std::sync::{RwLock}; use std::sync::{RwLock};
@ -59,5 +59,8 @@ impl SyncProvider for TestSyncProvider {
fn status(&self) -> SyncStatus { fn status(&self) -> SyncStatus {
self.status.read().unwrap().clone() self.status.read().unwrap().clone()
} }
fn new_transaction(&self, _raw_transaction: Bytes) {
}
} }

View File

@ -217,6 +217,10 @@ pub struct ChainSync {
network_id: U256, network_id: U256,
/// Miner /// Miner
miner: Arc<Miner>, miner: Arc<Miner>,
/// Transactions to propagate
// TODO: reconsider where this is in the codebase - seems a little dodgy to have here.
transactions_to_send: Vec<Bytes>,
} }
type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>; type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
@ -243,6 +247,7 @@ impl ChainSync {
max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks), max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks),
network_id: config.network_id, network_id: config.network_id,
miner: miner, miner: miner,
transactions_to_send: vec![],
} }
} }
@ -938,6 +943,12 @@ impl ChainSync {
sync.disable_peer(peer_id); sync.disable_peer(peer_id);
} }
} }
/// Place a new transaction on the wire.
pub fn new_transaction(&mut self, raw_transaction: Bytes) {
self.transactions_to_send.push(raw_transaction);
}
/// Called when peer sends us new transactions /// Called when peer sends us new transactions
fn on_peer_transactions(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { fn on_peer_transactions(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
// accepting transactions once only fully synced // accepting transactions once only fully synced
@ -1271,7 +1282,44 @@ impl ChainSync {
sent sent
} }
/// propagates new transactions to all peers
fn propagate_new_transactions(&mut self, io: &mut SyncIo) -> usize {
// Early out of nobody to send to.
if self.peers.len() == 0 {
return 0;
}
let mut packet = RlpStream::new_list(self.transactions_to_send.len());
for tx in self.transactions_to_send.iter() {
packet.append_raw(tx, 1);
}
self.transactions_to_send.clear();
let rlp = packet.out();
let lucky_peers = {
// sqrt(x)/x scaled to max u32
let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32;
let small = self.peers.len() < MIN_PEERS_PROPAGATION;
let lucky_peers = self.peers.iter()
.filter_map(|(&p, _)| if small || ::rand::random::<u32>() < fraction { Some(p.clone()) } else { None })
.collect::<Vec<_>>();
// taking at max of MAX_PEERS_PROPAGATION
lucky_peers.iter().map(|&id| id.clone()).take(min(lucky_peers.len(), MAX_PEERS_PROPAGATION)).collect::<Vec<PeerId>>()
};
let sent = lucky_peers.len();
for peer_id in lucky_peers {
self.send_packet(io, peer_id, TRANSACTIONS_PACKET, rlp.clone());
}
sent
}
fn propagate_latest_blocks(&mut self, io: &mut SyncIo) { fn propagate_latest_blocks(&mut self, io: &mut SyncIo) {
if !self.transactions_to_send.is_empty() {
self.propagate_new_transactions(io);
}
let chain_info = io.chain().chain_info(); let chain_info = io.chain().chain_info();
if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION { if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION {
let blocks = self.propagate_blocks(&chain_info, io); let blocks = self.propagate_blocks(&chain_info, io);

View File

@ -66,7 +66,7 @@ use std::ops::*;
use std::sync::*; use std::sync::*;
use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId}; use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId};
use util::TimerToken; use util::TimerToken;
use util::{U256, ONE_U256}; use util::{U256, Bytes, ONE_U256};
use ethcore::client::Client; use ethcore::client::Client;
use ethcore::service::SyncMessage; use ethcore::service::SyncMessage;
use ethminer::Miner; use ethminer::Miner;
@ -101,6 +101,9 @@ impl Default for SyncConfig {
pub trait SyncProvider: Send + Sync { pub trait SyncProvider: Send + Sync {
/// Get sync status /// Get sync status
fn status(&self) -> SyncStatus; fn status(&self) -> SyncStatus;
/// Note that a user has submitted a new transaction.
fn new_transaction(&self, raw_transaction: Bytes);
} }
/// Ethereum network protocol handler /// Ethereum network protocol handler
@ -140,6 +143,11 @@ impl SyncProvider for EthSync {
fn status(&self) -> SyncStatus { fn status(&self) -> SyncStatus {
self.sync.read().unwrap().status() self.sync.read().unwrap().status()
} }
/// Note that a user has submitted a new transaction.
fn new_transaction(&self, raw_transaction: Bytes) {
self.sync.write().unwrap().new_transaction(raw_transaction);
}
} }
impl NetworkProtocolHandler<SyncMessage> for EthSync { impl NetworkProtocolHandler<SyncMessage> for EthSync {