les: flesh out event handler
This commit is contained in:
@@ -20,11 +20,13 @@
|
||||
//! See https://github.com/ethcore/parity/wiki/Light-Ethereum-Subprotocol-(LES)
|
||||
|
||||
use ethcore::transaction::SignedTransaction;
|
||||
use ethcore::receipt::Receipt;
|
||||
|
||||
use io::TimerToken;
|
||||
use network::{NetworkProtocolHandler, NetworkContext, NetworkError, PeerId};
|
||||
use rlp::{RlpStream, Stream, UntrustedRlp, View};
|
||||
use util::hash::H256;
|
||||
use util::{Mutex, RwLock, U256};
|
||||
use util::{Bytes, Mutex, RwLock, U256};
|
||||
use time::SteadyTime;
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
@@ -134,16 +136,50 @@ impl Peer {
|
||||
}
|
||||
}
|
||||
|
||||
/// Context for a network event.
|
||||
pub struct EventContext<'a> {
|
||||
/// Protocol implementation.
|
||||
pub proto: &'a LightProtocol,
|
||||
/// Network context to enable immediate response to
|
||||
/// events.
|
||||
pub io: &'a NetworkContext<'a>,
|
||||
/// Relevant peer for event.
|
||||
pub peer: PeerId,
|
||||
}
|
||||
|
||||
/// An LES event handler.
|
||||
///
|
||||
/// Each handler function takes a context which describes the relevant peer
|
||||
/// and gives references to the IO layer and protocol structure so new messages
|
||||
/// can be dispatched immediately.
|
||||
///
|
||||
/// Request responses are not guaranteed to be complete or valid, but passed IDs will be correct.
|
||||
/// Response handlers are not given a copy of the original request; it is assumed
|
||||
/// that relevant data will be stored by interested handlers.
|
||||
pub trait Handler: Send + Sync {
|
||||
/// Called when a peer connects.
|
||||
fn on_connect(&self, _id: PeerId, _status: &Status, _capabilities: &Capabilities) { }
|
||||
/// Called when a peer disconnects
|
||||
fn on_disconnect(&self, _id: PeerId) { }
|
||||
fn on_connect(&self, _ctx: EventContext, _status: &Status, _capabilities: &Capabilities) { }
|
||||
/// Called when a peer disconnects, with a list of unfulfilled request IDs as
|
||||
/// of yet.
|
||||
fn on_disconnect(&self, _ctx: EventContext, _unfulfilled: &[ReqId]) { }
|
||||
/// Called when a peer makes an announcement.
|
||||
fn on_announcement(&self, _id: PeerId, _announcement: &Announcement) { }
|
||||
fn on_announcement(&self, _ctx: EventContext, _announcement: &Announcement) { }
|
||||
/// Called when a peer requests relay of some transactions.
|
||||
fn on_transactions(&self, _id: PeerId, _relay: &[SignedTransaction]) { }
|
||||
fn on_transactions(&self, _ctx: EventContext, _relay: &[SignedTransaction]) { }
|
||||
/// Called when a peer responds with block bodies.
|
||||
fn on_block_bodies(&self, _ctx: EventContext, _req_id: ReqId, _bodies: &[Bytes]) { }
|
||||
/// Called when a peer responds with block headers.
|
||||
fn on_block_headers(&self, _ctx: EventContext, _req_id: ReqId, _headers: &[Bytes]) { }
|
||||
/// Called when a peer responds with block receipts.
|
||||
fn on_receipts(&self, _ctx: EventContext, _req_id: ReqId, _receipts: &[Vec<Receipt>]) { }
|
||||
/// Called when a peer responds with state proofs. Each proof is a series of trie
|
||||
/// nodes in ascending order by distance from the root.
|
||||
fn on_state_proofs(&self, _ctx: EventContext, _req_id: ReqId, _proofs: &[Vec<Bytes>]) { }
|
||||
/// Called when a peer responds with contract code.
|
||||
fn on_code(&self, _ctx: EventContext, _req_id: ReqId, _codes: &[Bytes]) { }
|
||||
/// Called when a peer responds with header proofs. Each proof is a series of trie
|
||||
/// nodes is ascending order by distance from the root.
|
||||
fn on_header_proofs(&self, _ctx: EventContext, _req_id: ReqId, _proofs: &[Vec<Bytes>]) { }
|
||||
}
|
||||
|
||||
// a request and the time it was made.
|
||||
@@ -290,7 +326,8 @@ impl LightProtocol {
|
||||
/// Add an event handler.
|
||||
/// Ownership will be transferred to the protocol structure,
|
||||
/// and the handler will be kept alive as long as it is.
|
||||
/// These are intended to be added at the beginning of the
|
||||
/// These are intended to be added when the protocol structure
|
||||
/// is initialized as a means of customizing its behavior.
|
||||
pub fn add_handler(&mut self, handler: Box<Handler>) {
|
||||
self.handlers.push(handler);
|
||||
}
|
||||
@@ -313,12 +350,23 @@ impl LightProtocol {
|
||||
}
|
||||
|
||||
// called when a peer disconnects.
|
||||
fn on_disconnect(&self, peer: PeerId) {
|
||||
// TODO: reassign all requests assigned to this peer.
|
||||
fn on_disconnect(&self, peer: PeerId, io: &NetworkContext) {
|
||||
self.pending_peers.write().remove(&peer);
|
||||
if self.peers.write().remove(&peer).is_some() {
|
||||
if let Some(peer_info) = self.peers.write().remove(&peer) {
|
||||
let unfulfilled: Vec<_> = peer_info.into_inner().current_asking.into_iter().map(ReqId).collect();
|
||||
{
|
||||
let mut pending = self.pending_requests.write();
|
||||
for &ReqId(ref inner) in &unfulfilled {
|
||||
pending.remove(inner);
|
||||
}
|
||||
}
|
||||
|
||||
for handler in &self.handlers {
|
||||
handler.on_disconnect(peer)
|
||||
handler.on_disconnect(EventContext {
|
||||
peer: peer,
|
||||
io: io,
|
||||
proto: self,
|
||||
}, &unfulfilled)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -350,7 +398,7 @@ impl LightProtocol {
|
||||
}
|
||||
|
||||
// Handle status message from peer.
|
||||
fn status(&self, peer: &PeerId, data: UntrustedRlp) -> Result<(), Error> {
|
||||
fn status(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> {
|
||||
let pending = match self.pending_peers.write().remove(peer) {
|
||||
Some(pending) => pending,
|
||||
None => {
|
||||
@@ -377,45 +425,56 @@ impl LightProtocol {
|
||||
}));
|
||||
|
||||
for handler in &self.handlers {
|
||||
handler.on_connect(*peer, &status, &capabilities)
|
||||
handler.on_connect(EventContext {
|
||||
peer: *peer,
|
||||
io: io,
|
||||
proto: self,
|
||||
}, &status, &capabilities)
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Handle an announcement.
|
||||
fn announcement(&self, peer: &PeerId, data: UntrustedRlp) -> Result<(), Error> {
|
||||
fn announcement(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> {
|
||||
if !self.peers.read().contains_key(peer) {
|
||||
debug!(target: "les", "Ignoring announcement from unknown peer");
|
||||
return Ok(())
|
||||
}
|
||||
|
||||
let announcement = try!(status::parse_announcement(data));
|
||||
let peers = self.peers.read();
|
||||
|
||||
let peer_info = match peers.get(peer) {
|
||||
Some(info) => info,
|
||||
None => return Ok(()),
|
||||
};
|
||||
|
||||
let mut peer_info = peer_info.lock();
|
||||
|
||||
// update status.
|
||||
// scope to ensure locks are dropped before moving into handler-space.
|
||||
{
|
||||
// TODO: punish peer if they've moved backwards.
|
||||
let status = &mut peer_info.status;
|
||||
let last_head = status.head_hash;
|
||||
status.head_hash = announcement.head_hash;
|
||||
status.head_td = announcement.head_td;
|
||||
status.head_num = announcement.head_num;
|
||||
status.last_head = Some((last_head, announcement.reorg_depth));
|
||||
let peers = self.peers.read();
|
||||
let peer_info = match peers.get(peer) {
|
||||
Some(info) => info,
|
||||
None => return Ok(()),
|
||||
};
|
||||
|
||||
let mut peer_info = peer_info.lock();
|
||||
|
||||
// update status.
|
||||
{
|
||||
// TODO: punish peer if they've moved backwards.
|
||||
let status = &mut peer_info.status;
|
||||
let last_head = status.head_hash;
|
||||
status.head_hash = announcement.head_hash;
|
||||
status.head_td = announcement.head_td;
|
||||
status.head_num = announcement.head_num;
|
||||
status.last_head = Some((last_head, announcement.reorg_depth));
|
||||
}
|
||||
|
||||
// update capabilities.
|
||||
peer_info.capabilities.update_from(&announcement);
|
||||
}
|
||||
|
||||
// update capabilities.
|
||||
peer_info.capabilities.update_from(&announcement);
|
||||
|
||||
for handler in &self.handlers {
|
||||
handler.on_announcement(*peer, &announcement);
|
||||
handler.on_announcement(EventContext {
|
||||
peer: *peer,
|
||||
io: io,
|
||||
proto: self,
|
||||
}, &announcement);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -736,7 +795,7 @@ impl LightProtocol {
|
||||
}
|
||||
|
||||
// Receive a set of transactions to relay.
|
||||
fn relay_transactions(&self, peer: &PeerId, data: UntrustedRlp) -> Result<(), Error> {
|
||||
fn relay_transactions(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> {
|
||||
const MAX_TRANSACTIONS: usize = 256;
|
||||
|
||||
let txs: Vec<_> = try!(data.iter().take(MAX_TRANSACTIONS).map(|x| x.as_val::<SignedTransaction>()).collect());
|
||||
@@ -744,7 +803,11 @@ impl LightProtocol {
|
||||
debug!(target: "les", "Received {} transactions to relay from peer {}", txs.len(), peer);
|
||||
|
||||
for handler in &self.handlers {
|
||||
handler.on_transactions(*peer, &txs);
|
||||
handler.on_transactions(EventContext {
|
||||
peer: *peer,
|
||||
io: io,
|
||||
proto: self,
|
||||
}, &txs);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -761,8 +824,8 @@ impl NetworkProtocolHandler for LightProtocol {
|
||||
|
||||
// handle the packet
|
||||
let res = match packet_id {
|
||||
packet::STATUS => self.status(peer, rlp),
|
||||
packet::ANNOUNCE => self.announcement(peer, rlp),
|
||||
packet::STATUS => self.status(peer, io, rlp),
|
||||
packet::ANNOUNCE => self.announcement(peer, io, rlp),
|
||||
|
||||
packet::GET_BLOCK_HEADERS => self.get_block_headers(peer, io, rlp),
|
||||
packet::BLOCK_HEADERS => self.block_headers(peer, io, rlp),
|
||||
@@ -782,7 +845,7 @@ impl NetworkProtocolHandler for LightProtocol {
|
||||
packet::GET_HEADER_PROOFS => self.get_header_proofs(peer, io, rlp),
|
||||
packet::HEADER_PROOFS => self.header_proofs(peer, io, rlp),
|
||||
|
||||
packet::SEND_TRANSACTIONS => self.relay_transactions(peer, rlp),
|
||||
packet::SEND_TRANSACTIONS => self.relay_transactions(peer, io, rlp),
|
||||
|
||||
other => {
|
||||
Err(Error::UnrecognizedPacket(other))
|
||||
@@ -809,8 +872,8 @@ impl NetworkProtocolHandler for LightProtocol {
|
||||
self.on_connect(peer, io);
|
||||
}
|
||||
|
||||
fn disconnected(&self, _io: &NetworkContext, peer: &PeerId) {
|
||||
self.on_disconnect(*peer);
|
||||
fn disconnected(&self, io: &NetworkContext, peer: &PeerId) {
|
||||
self.on_disconnect(*peer, io);
|
||||
}
|
||||
|
||||
fn timeout(&self, _io: &NetworkContext, timer: TimerToken) {
|
||||
|
||||
Reference in New Issue
Block a user