light: broadcast status updates to peers

This commit is contained in:
Robert Habermeier 2016-12-08 23:57:09 +01:00
parent 6f5f1f5e26
commit e7ce8c9558
2 changed files with 41 additions and 1 deletions

View File

@ -27,7 +27,7 @@ use network::{NetworkProtocolHandler, NetworkContext, NetworkError, PeerId};
use rlp::{RlpStream, Stream, UntrustedRlp, View}; use rlp::{RlpStream, Stream, UntrustedRlp, View};
use util::hash::H256; use util::hash::H256;
use util::{Bytes, Mutex, RwLock, U256}; use util::{Bytes, Mutex, RwLock, U256};
use time::SteadyTime; use time::{Duration, SteadyTime};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
@ -54,6 +54,9 @@ pub use self::status::{Status, Capabilities, Announcement};
const TIMEOUT: TimerToken = 0; const TIMEOUT: TimerToken = 0;
const TIMEOUT_INTERVAL_MS: u64 = 1000; const TIMEOUT_INTERVAL_MS: u64 = 1000;
// minimum interval between updates.
const UPDATE_INTERVAL_MS: i64 = 5000;
// Supported protocol versions. // Supported protocol versions.
pub const PROTOCOL_VERSIONS: &'static [u8] = &[1]; pub const PROTOCOL_VERSIONS: &'static [u8] = &[1];
@ -107,6 +110,7 @@ pub struct ReqId(usize);
// may not have received one for. // may not have received one for.
struct PendingPeer { struct PendingPeer {
sent_head: H256, sent_head: H256,
last_update: SteadyTime,
} }
// data about each peer. // data about each peer.
@ -117,6 +121,7 @@ struct Peer {
capabilities: Capabilities, capabilities: Capabilities,
remote_flow: FlowParams, remote_flow: FlowParams,
sent_head: H256, // last head we've given them. sent_head: H256, // last head we've given them.
last_update: SteadyTime,
} }
impl Peer { impl Peer {
@ -293,6 +298,7 @@ impl LightProtocol {
/// The announcement is expected to be valid. /// The announcement is expected to be valid.
pub fn make_announcement(&self, io: &IoContext, mut announcement: Announcement) { pub fn make_announcement(&self, io: &IoContext, mut announcement: Announcement) {
let mut reorgs_map = HashMap::new(); let mut reorgs_map = HashMap::new();
let now = SteadyTime::now();
// update stored capabilities // update stored capabilities
self.capabilities.write().update_from(&announcement); self.capabilities.write().update_from(&announcement);
@ -300,6 +306,17 @@ impl LightProtocol {
// calculate reorg info and send packets // calculate reorg info and send packets
for (peer_id, peer_info) in self.peers.read().iter() { for (peer_id, peer_info) in self.peers.read().iter() {
let mut peer_info = peer_info.lock(); let mut peer_info = peer_info.lock();
// TODO: "urgent" announcements like new blocks?
// the timer approach will skip 1 (possibly 2) in rare occasions.
if peer_info.sent_head == announcement.head_hash ||
peer_info.status.head_num >= announcement.head_num ||
now - peer_info.last_update < Duration::milliseconds(UPDATE_INTERVAL_MS) {
continue
}
peer_info.last_update = now;
let reorg_depth = reorgs_map.entry(peer_info.sent_head) let reorg_depth = reorgs_map.entry(peer_info.sent_head)
.or_insert_with(|| { .or_insert_with(|| {
match self.provider.reorg_depth(&announcement.head_hash, &peer_info.sent_head) { match self.provider.reorg_depth(&announcement.head_hash, &peer_info.sent_head) {
@ -496,6 +513,7 @@ impl LightProtocol {
Ok(PendingPeer { Ok(PendingPeer {
sent_head: chain_info.best_block_hash, sent_head: chain_info.best_block_hash,
last_update: SteadyTime::now(),
}) })
} }
@ -523,6 +541,7 @@ impl LightProtocol {
capabilities: capabilities.clone(), capabilities: capabilities.clone(),
remote_flow: flow_params, remote_flow: flow_params,
sent_head: pending.sent_head, sent_head: pending.sent_head,
last_update: pending.last_update,
})); }));
for handler in &self.handlers { for handler in &self.handlers {

View File

@ -279,6 +279,8 @@ impl ChainNotify for EthSync {
sealed: Vec<H256>, sealed: Vec<H256>,
_duration: u64) _duration: u64)
{ {
use light::net::Announcement;
self.network.with_context(self.subprotocol_name, |context| { self.network.with_context(self.subprotocol_name, |context| {
let mut sync_io = NetSyncIo::new(context, &*self.sync_handler.chain, &*self.sync_handler.snapshot_service, &self.sync_handler.overlay); let mut sync_io = NetSyncIo::new(context, &*self.sync_handler.chain, &*self.sync_handler.snapshot_service, &self.sync_handler.overlay);
self.sync_handler.sync.write().chain_new_blocks( self.sync_handler.sync.write().chain_new_blocks(
@ -289,6 +291,25 @@ impl ChainNotify for EthSync {
&retracted, &retracted,
&sealed); &sealed);
}); });
self.network.with_context(self.light_subprotocol_name, |context| {
let light_proto = match self.light_proto.as_ref() {
Some(lp) => lp,
None => return,
};
let chain_info = self.sync_handler.chain.chain_info();
light_proto.make_announcement(context, Announcement {
head_hash: chain_info.best_block_hash,
head_num: chain_info.best_block_number,
head_td: chain_info.total_difficulty,
reorg_depth: 0, // recalculated on a per-peer basis.
serve_headers: false, // these fields consist of _changes_ in capability.
serve_state_since: None,
serve_chain_since: None,
tx_relay: false,
})
})
} }
fn start(&self) { fn start(&self) {