Merge pull request #3796 from ethcore/tx-broadcast

Avoid broadcasting transactions to peers that send them
This commit is contained in:
Gav Wood 2016-12-12 04:13:56 +01:00 committed by GitHub
commit c0a2d5c8f5
15 changed files with 119 additions and 72 deletions

View File

@ -16,7 +16,7 @@
//! I/O and event context generalizations.
use network::{NetworkContext, PeerId};
use network::{NetworkContext, PeerId, NodeId};
use super::{Announcement, LightProtocol, ReqId};
use super::error::Error;
@ -41,8 +41,12 @@ pub trait IoContext {
/// Get a peer's protocol version.
fn protocol_version(&self, peer: PeerId) -> Option<u8>;
/// Persistent peer id
fn persistent_peer_id(&self, peer: PeerId) -> Option<NodeId>;
}
impl<'a> IoContext for NetworkContext<'a> {
fn send(&self, peer: PeerId, packet_id: u8, packet_body: Vec<u8>) {
if let Err(e) = self.send(peer, packet_id, packet_body) {
@ -67,6 +71,10 @@ impl<'a> IoContext for NetworkContext<'a> {
fn protocol_version(&self, peer: PeerId) -> Option<u8> {
self.protocol_version(self.subprotocol_name(), peer)
}
fn persistent_peer_id(&self, peer: PeerId) -> Option<NodeId> {
self.session_info(peer).and_then(|info| info.id)
}
}
/// Context for a protocol event.
@ -75,6 +83,9 @@ pub trait EventContext {
/// disconnected/connected peer.
fn peer(&self) -> PeerId;
/// Returns the relevant's peer persistent Id (aka NodeId).
fn persistent_peer_id(&self, peer: PeerId) -> Option<NodeId>;
/// Make a request from a peer.
fn request_from(&self, peer: PeerId, request: Request) -> Result<ReqId, Error>;
@ -101,7 +112,14 @@ pub struct Ctx<'a> {
}
impl<'a> EventContext for Ctx<'a> {
fn peer(&self) -> PeerId { self.peer }
fn peer(&self) -> PeerId {
self.peer
}
fn persistent_peer_id(&self, id: PeerId) -> Option<NodeId> {
self.io.persistent_peer_id(id)
}
fn request_from(&self, peer: PeerId, request: Request) -> Result<ReqId, Error> {
self.proto.request_from(self.io, &peer, request)
}

View File

@ -21,7 +21,7 @@ use ethcore::blockchain_info::BlockChainInfo;
use ethcore::client::{BlockChainClient, EachBlockWith, TestBlockChainClient};
use ethcore::ids::BlockId;
use ethcore::transaction::SignedTransaction;
use network::PeerId;
use network::{PeerId, NodeId};
use net::buffer_flow::FlowParams;
use net::context::IoContext;
@ -68,6 +68,10 @@ impl IoContext for Expect {
fn protocol_version(&self, _peer: PeerId) -> Option<u8> {
Some(super::MAX_PROTOCOL_VERSION)
}
fn persistent_peer_id(&self, _peer: PeerId) -> Option<NodeId> {
None
}
}
// can't implement directly for Arc due to cross-crate orphan rules.

View File

@ -40,6 +40,14 @@ pub trait ChainNotify : Send + Sync {
fn stop(&self) {
// does nothing by default
}
/// fires when new transactions are received from a peer
fn transactions_received(&self,
_hashes: Vec<H256>,
_peer_id: usize,
) {
// does nothing by default
}
}
impl IpcConfig for ChainNotify { }

View File

@ -559,11 +559,15 @@ impl Client {
}
/// Import transactions from the IO queue
pub fn import_queued_transactions(&self, transactions: &[Bytes]) -> usize {
pub fn import_queued_transactions(&self, transactions: &[Bytes], peer_id: usize) -> usize {
trace!(target: "external_tx", "Importing queued");
let _timer = PerfTimer::new("import_queued_transactions");
self.queue_transactions.fetch_sub(transactions.len(), AtomicOrdering::SeqCst);
let txs = transactions.iter().filter_map(|bytes| UntrustedRlp::new(bytes).as_val().ok()).collect();
let txs: Vec<SignedTransaction> = transactions.iter().filter_map(|bytes| UntrustedRlp::new(bytes).as_val().ok()).collect();
let hashes: Vec<_> = txs.iter().map(|tx| tx.hash()).collect();
self.notify(|notify| {
notify.transactions_received(hashes.clone(), peer_id);
});
let results = self.miner.import_external_transactions(self, txs);
results.len()
}
@ -1264,14 +1268,14 @@ impl BlockChainClient for Client {
(*self.build_last_hashes(self.chain.read().best_block_hash())).clone()
}
fn queue_transactions(&self, transactions: Vec<Bytes>) {
fn queue_transactions(&self, transactions: Vec<Bytes>, peer_id: usize) {
let queue_size = self.queue_transactions.load(AtomicOrdering::Relaxed);
trace!(target: "external_tx", "Queue size: {}", queue_size);
if queue_size > MAX_TX_QUEUE_SIZE {
debug!("Ignoring {} transactions: queue is full", transactions.len());
} else {
let len = transactions.len();
match self.io_channel.lock().send(ClientIoMessage::NewTransactions(transactions)) {
match self.io_channel.lock().send(ClientIoMessage::NewTransactions(transactions, peer_id)) {
Ok(_) => {
self.queue_transactions.fetch_add(len, AtomicOrdering::SeqCst);
}

View File

@ -657,7 +657,7 @@ impl BlockChainClient for TestBlockChainClient {
unimplemented!();
}
fn queue_transactions(&self, transactions: Vec<Bytes>) {
fn queue_transactions(&self, transactions: Vec<Bytes>, _peer_id: usize) {
// import right here
let txs = transactions.into_iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect();
self.miner.import_external_transactions(self, txs);

View File

@ -200,7 +200,7 @@ pub trait BlockChainClient : Sync + Send {
fn last_hashes(&self) -> LastHashes;
/// Queue transactions for importing.
fn queue_transactions(&self, transactions: Vec<Bytes>);
fn queue_transactions(&self, transactions: Vec<Bytes>, peer_id: usize);
/// list all transactions
fn pending_transactions(&self) -> Vec<SignedTransaction>;

View File

@ -39,7 +39,7 @@ pub enum ClientIoMessage {
/// A block is ready
BlockVerified,
/// New transaction RLPs are ready to be imported
NewTransactions(Vec<Bytes>),
NewTransactions(Vec<Bytes>, usize),
/// Begin snapshot restoration
BeginRestoration(ManifestData),
/// Feed a state chunk to the snapshot service
@ -196,7 +196,9 @@ impl IoHandler<ClientIoMessage> for ClientIoHandler {
match *net_message {
ClientIoMessage::BlockVerified => { self.client.import_verified_blocks(); }
ClientIoMessage::NewTransactions(ref transactions) => { self.client.import_queued_transactions(transactions); }
ClientIoMessage::NewTransactions(ref transactions, peer_id) => {
self.client.import_queued_transactions(transactions, peer_id);
}
ClientIoMessage::BeginRestoration(ref manifest) => {
if let Err(e) = self.snapshot.init_restore(manifest.clone(), true) {
warn!("Failed to initialize snapshot restoration: {}", e);

View File

@ -48,7 +48,6 @@ class BaseTransaction extends Component {
<IdentityIcon
address={ transaction.from }
/>
0x{ transaction.nonce.toString(16) }
</div>
);
}

View File

@ -34,7 +34,7 @@ describe('dapps/localtx/Transaction', () => {
it('renders without crashing', () => {
const transaction = {
hash: '0x1234567890',
nonce: 15,
nonce: new BigNumber(15),
gasPrice: new BigNumber(10),
gas: new BigNumber(10)
};

View File

@ -105,13 +105,13 @@ impl SyncProvider for TestSyncProvider {
first_seen: 10,
propagated_to: map![
128.into() => 16
]
],
},
5.into() => TransactionStats {
first_seen: 16,
propagated_to: map![
16.into() => 1
]
],
}
]
}

View File

@ -157,7 +157,7 @@ impl From<SyncTransactionStats> for TransactionStats {
propagated_to: s.propagated_to
.into_iter()
.map(|(id, count)| (id.into(), count))
.collect()
.collect(),
}
}
}
@ -208,7 +208,7 @@ mod tests {
first_seen: 100,
propagated_to: map![
10.into() => 50
]
],
};
let serialized = serde_json::to_string(&stats).unwrap();

View File

@ -335,6 +335,11 @@ impl ChainNotify for EthSync {
self.sync_handler.snapshot_service.abort_restore();
self.network.stop().unwrap_or_else(|e| warn!("Error stopping network: {:?}", e));
}
fn transactions_received(&self, hashes: Vec<H256>, peer_id: PeerId) {
let mut sync = self.sync_handler.sync.write();
sync.transactions_received(hashes, peer_id);
}
}
/// LES event handler.
@ -344,7 +349,7 @@ struct TxRelay(Arc<BlockChainClient>);
impl LightHandler for TxRelay {
fn on_transactions(&self, ctx: &EventContext, relay: &[::ethcore::transaction::SignedTransaction]) {
trace!(target: "les", "Relaying {} transactions from peer {}", relay.len(), ctx.peer());
self.0.queue_transactions(relay.iter().map(|tx| ::rlp::encode(tx).to_vec()).collect())
self.0.queue_transactions(relay.iter().map(|tx| ::rlp::encode(tx).to_vec()).collect(), ctx.peer())
}
}

View File

@ -432,6 +432,13 @@ impl ChainSync {
self.transactions_stats.stats()
}
/// Updates transactions were received by a peer
pub fn transactions_received(&mut self, hashes: Vec<H256>, peer_id: PeerId) {
if let Some(mut peer_info) = self.peers.get_mut(&peer_id) {
peer_info.last_sent_transactions.extend(&hashes);
}
}
/// Abort all sync activity
pub fn abort(&mut self, io: &mut SyncIo) {
self.reset_and_continue(io);
@ -1409,7 +1416,7 @@ impl ChainSync {
let tx = rlp.as_raw().to_vec();
transactions.push(tx);
}
io.chain().queue_transactions(transactions);
io.chain().queue_transactions(transactions, peer_id);
Ok(())
}

View File

@ -112,7 +112,7 @@ mod tests {
propagated_to: hash_map![
enodeid1 => 2,
enodeid2 => 1
]
],
}));
}

View File

@ -99,7 +99,7 @@ pub use stats::NetworkStats;
pub use session::SessionInfo;
use io::TimerToken;
pub use node_table::is_valid_node_url;
pub use node_table::{is_valid_node_url, NodeId};
const PROTOCOL_VERSION: u32 = 4;