transaction propagation on a timer

This commit is contained in:
Robert Habermeier 2017-03-23 20:02:46 +01:00
parent b76860fd2b
commit e0a79699ea
3 changed files with 51 additions and 3 deletions

View File

@ -27,7 +27,7 @@ use util::hash::H256;
use util::{DBValue, Mutex, RwLock, U256}; use util::{DBValue, Mutex, RwLock, U256};
use time::{Duration, SteadyTime}; use time::{Duration, SteadyTime};
use std::collections::HashMap; use std::collections::{HashMap, HashSet};
use std::fmt; use std::fmt;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
@ -61,6 +61,9 @@ const TIMEOUT_INTERVAL_MS: u64 = 1000;
const TICK_TIMEOUT: TimerToken = 1; const TICK_TIMEOUT: TimerToken = 1;
const TICK_TIMEOUT_INTERVAL_MS: u64 = 5000; const TICK_TIMEOUT_INTERVAL_MS: u64 = 5000;
const PROPAGATE_TIMEOUT: TimerToken = 2;
const PROPAGATE_TIMEOUT_INTERVAL_MS: u64 = 5000;
// minimum interval between updates. // minimum interval between updates.
const UPDATE_INTERVAL_MS: i64 = 5000; const UPDATE_INTERVAL_MS: i64 = 5000;
@ -132,6 +135,7 @@ pub struct Peer {
last_update: SteadyTime, last_update: SteadyTime,
pending_requests: RequestSet, pending_requests: RequestSet,
failed_requests: Vec<ReqId>, failed_requests: Vec<ReqId>,
propagated_transactions: HashSet<H256>,
} }
/// A light protocol event handler. /// A light protocol event handler.
@ -499,6 +503,47 @@ impl LightProtocol {
} }
} }
// propagate transactions to relay peers.
// if we aren't on the mainnet, we just propagate to all relay peers
fn propagate_transactions(&self, io: &IoContext) {
if self.capabilities.read().tx_relay { return }
let ready_transactions = self.provider.ready_transactions();
if ready_transactions.is_empty() { return }
trace!(target: "pip", "propagate transactions: {} ready", ready_transactions.len());
let all_transaction_hashes: HashSet<_> = ready_transactions.iter().map(|tx| tx.hash()).collect();
let mut buf = Vec::new();
let peers = self.peers.read();
for (peer_id, peer_info) in peers.iter() {
let mut peer_info = peer_info.lock();
if !peer_info.capabilities.tx_relay { continue }
let prop_filter = &mut peer_info.propagated_transactions;
*prop_filter = &*prop_filter & &all_transaction_hashes;
// fill the buffer with all non-propagated transactions.
let to_propagate = ready_transactions.iter()
.filter(|tx| prop_filter.insert(tx.hash()))
.map(|tx| &tx.transaction);
buf.extend(to_propagate);
// propagate to the given peer.
if buf.is_empty() { continue }
io.send(*peer_id, packet::SEND_TRANSACTIONS, {
let mut stream = RlpStream::new_list(buf.len());
for pending_tx in buf.drain(..) {
stream.append(pending_tx);
}
stream.out()
})
}
}
/// called when a peer connects. /// called when a peer connects.
pub fn on_connect(&self, peer: &PeerId, io: &IoContext) { pub fn on_connect(&self, peer: &PeerId, io: &IoContext) {
let proto_version = match io.protocol_version(*peer).ok_or(Error::WrongNetwork) { let proto_version = match io.protocol_version(*peer).ok_or(Error::WrongNetwork) {
@ -613,6 +658,7 @@ impl LightProtocol {
last_update: pending.last_update, last_update: pending.last_update,
pending_requests: RequestSet::default(), pending_requests: RequestSet::default(),
failed_requests: Vec::new(), failed_requests: Vec::new(),
propagated_transactions: HashSet::new(),
})); }));
for handler in &self.handlers { for handler in &self.handlers {
@ -797,6 +843,8 @@ impl NetworkProtocolHandler for LightProtocol {
.expect("Error registering sync timer."); .expect("Error registering sync timer.");
io.register_timer(TICK_TIMEOUT, TICK_TIMEOUT_INTERVAL_MS) io.register_timer(TICK_TIMEOUT, TICK_TIMEOUT_INTERVAL_MS)
.expect("Error registering sync timer."); .expect("Error registering sync timer.");
io.register_timer(PROPAGATE_TIMEOUT, PROPAGATE_TIMEOUT_INTERVAL_MS)
.expect("Error registering sync timer.");
} }
fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
@ -815,6 +863,7 @@ impl NetworkProtocolHandler for LightProtocol {
match timer { match timer {
TIMEOUT => self.timeout_check(io), TIMEOUT => self.timeout_check(io),
TICK_TIMEOUT => self.tick_handlers(io), TICK_TIMEOUT => self.tick_handlers(io),
PROPAGATE_TIMEOUT => self.propagate_transactions(io),
_ => warn!(target: "pip", "received timeout on unknown token {}", timer), _ => warn!(target: "pip", "received timeout on unknown token {}", timer),
} }
} }

View File

@ -348,7 +348,6 @@ impl Dependencies for LightDependencies {
// TODO: filters. // TODO: filters.
add_signing_methods!(EthSigning, handler, self); add_signing_methods!(EthSigning, handler, self);
}, },
Api::Personal => { Api::Personal => {
handler.extend_with(PersonalClient::new(&self.secret_store, dispatcher.clone(), self.geth_compatibility).to_delegate()); handler.extend_with(PersonalClient::new(&self.secret_store, dispatcher.clone(), self.geth_compatibility).to_delegate());

View File

@ -414,7 +414,7 @@ struct TxRelay(Arc<BlockChainClient>);
impl LightHandler for TxRelay { impl LightHandler for TxRelay {
fn on_transactions(&self, ctx: &EventContext, relay: &[::ethcore::transaction::UnverifiedTransaction]) { fn on_transactions(&self, ctx: &EventContext, relay: &[::ethcore::transaction::UnverifiedTransaction]) {
trace!(target: "les", "Relaying {} transactions from peer {}", relay.len(), ctx.peer()); trace!(target: "pip", "Relaying {} transactions from peer {}", relay.len(), ctx.peer());
self.0.queue_transactions(relay.iter().map(|tx| ::rlp::encode(tx).to_vec()).collect(), ctx.peer()) self.0.queue_transactions(relay.iter().map(|tx| ::rlp::encode(tx).to_vec()).collect(), ctx.peer())
} }
} }