trigger event handlers, update capabilities

This commit is contained in:
Robert Habermeier 2016-11-18 15:30:06 +01:00
parent 3fabad5c0f
commit 63aa54cfc7
3 changed files with 63 additions and 56 deletions

View File

@ -1,40 +0,0 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Network events and event listeners.
use network::PeerId;
use super::{Status, Capabilities, Announcement};
use transaction::SignedTransaction;
/// Peer connected
pub struct Connect(PeerId, Status, Capabilities);
/// Peer disconnected
pub struct Disconnect(PeerId);
/// Peer announces new capabilities.
pub struct Announcement(PeerId, Announcement);
/// Transactions to be relayed.
pub struct RelayTransactions(Vec<SignedTransaction>);
/// An LES event handler.
pub trait Handler {
fn on_connect(&self, _event: Connect);
}

View File

@ -30,16 +30,15 @@ use std::sync::atomic::AtomicUsize;
use light::provider::Provider; use light::provider::Provider;
use light::request::{self, Request}; use light::request::{self, Request};
use transaction::SignedTransaction;
use self::buffer_flow::{Buffer, FlowParams}; use self::buffer_flow::{Buffer, FlowParams};
use self::error::{Error, Punishment}; use self::error::{Error, Punishment};
use self::status::{Status, Capabilities};
mod buffer_flow; mod buffer_flow;
mod error; mod error;
mod status; mod status;
pub mod event;
pub use self::status::{Status, Capabilities, Announcement}; pub use self::status::{Status, Capabilities, Announcement};
const TIMEOUT: TimerToken = 0; const TIMEOUT: TimerToken = 0;
@ -126,6 +125,18 @@ impl Peer {
} }
} }
/// An LES event handler.
pub trait Handler: Send + Sync {
/// Called when a peer connects.
fn on_connect(&self, _id: PeerId, _status: &Status, _capabilities: &Capabilities) { }
/// Called when a peer disconnects
fn on_disconnect(&self, _id: PeerId) { }
/// Called when a peer makes an announcement.
fn on_announcement(&self, _id: PeerId, _announcement: &Announcement) { }
/// Called when a peer requests relay of some transactions.
fn on_transactions(&self, _id: PeerId, _relay: &[SignedTransaction]) { }
}
/// This is an implementation of the light ethereum network protocol, abstracted /// This is an implementation of the light ethereum network protocol, abstracted
/// over a `Provider` of data and a p2p network. /// over a `Provider` of data and a p2p network.
/// ///
@ -141,6 +152,7 @@ pub struct LightProtocol {
pending_requests: RwLock<HashMap<usize, Request>>, pending_requests: RwLock<HashMap<usize, Request>>,
capabilities: RwLock<Capabilities>, capabilities: RwLock<Capabilities>,
flow_params: FlowParams, // assumed static and same for every peer. flow_params: FlowParams, // assumed static and same for every peer.
handlers: Vec<Box<Handler>>,
req_id: AtomicUsize, req_id: AtomicUsize,
} }
@ -150,6 +162,9 @@ impl LightProtocol {
pub fn make_announcement(&self, mut announcement: Announcement, io: &NetworkContext) { pub fn make_announcement(&self, mut announcement: Announcement, io: &NetworkContext) {
let mut reorgs_map = HashMap::new(); let mut reorgs_map = HashMap::new();
// update stored capabilities
self.capabilities.write().update_from(&announcement);
// calculate reorg info and send packets // calculate reorg info and send packets
for (peer_id, peer_info) in self.peers.write().iter_mut() { for (peer_id, peer_info) in self.peers.write().iter_mut() {
let reorg_depth = reorgs_map.entry(peer_info.sent_head) let reorg_depth = reorgs_map.entry(peer_info.sent_head)
@ -174,6 +189,14 @@ impl LightProtocol {
} }
} }
} }
/// Add an event handler.
/// Ownership will be transferred to the protocol structure,
/// and the handler will be kept alive as long as it is.
/// These are intended to be added at the beginning of the
pub fn add_handler(&mut self, handler: Box<Handler>) {
self.handlers.push(handler);
}
} }
impl LightProtocol { impl LightProtocol {
@ -196,7 +219,11 @@ impl LightProtocol {
fn on_disconnect(&self, peer: PeerId) { fn on_disconnect(&self, peer: PeerId) {
// TODO: reassign all requests assigned to this peer. // TODO: reassign all requests assigned to this peer.
self.pending_peers.write().remove(&peer); self.pending_peers.write().remove(&peer);
self.peers.write().remove(&peer); if self.peers.write().remove(&peer).is_some() {
for handler in &self.handlers {
handler.on_disconnect(peer)
}
}
} }
// send status to a peer. // send status to a peer.
@ -246,12 +273,16 @@ impl LightProtocol {
local_buffer: Mutex::new(self.flow_params.create_buffer()), local_buffer: Mutex::new(self.flow_params.create_buffer()),
remote_buffer: flow_params.create_buffer(), remote_buffer: flow_params.create_buffer(),
current_asking: HashSet::new(), current_asking: HashSet::new(),
status: status, status: status.clone(),
capabilities: capabilities, capabilities: capabilities.clone(),
remote_flow: flow_params, remote_flow: flow_params,
sent_head: pending.sent_head, sent_head: pending.sent_head,
}); });
for handler in &self.handlers {
handler.on_connect(*peer, &status, &capabilities)
}
Ok(()) Ok(())
} }
@ -282,15 +313,11 @@ impl LightProtocol {
} }
// update capabilities. // update capabilities.
{ peer_info.capabilities.update_from(&announcement);
let caps = &mut peer_info.capabilities;
caps.serve_headers = caps.serve_headers || announcement.serve_headers;
caps.serve_state_since = caps.serve_state_since.or(announcement.serve_state_since);
caps.serve_chain_since = caps.serve_chain_since.or(announcement.serve_chain_since);
caps.tx_relay = caps.tx_relay || announcement.tx_relay;
}
// TODO: notify listeners if new best block. for handler in &self.handlers {
handler.on_announcement(*peer, &announcement);
}
Ok(()) Ok(())
} }
@ -603,8 +630,18 @@ impl LightProtocol {
} }
// Receive a set of transactions to relay. // Receive a set of transactions to relay.
fn relay_transactions(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { fn relay_transactions(&self, peer: &PeerId, data: UntrustedRlp) -> Result<(), Error> {
unimplemented!() const MAX_TRANSACTIONS: usize = 256;
let txs: Vec<_> = try!(data.iter().take(MAX_TRANSACTIONS).map(|x| x.as_val::<SignedTransaction>()).collect());
debug!(target: "les", "Received {} transactions to relay from peer {}", txs.len(), peer);
for handler in &self.handlers {
handler.on_transactions(*peer, &txs);
}
Ok(())
} }
} }
@ -639,7 +676,7 @@ impl NetworkProtocolHandler for LightProtocol {
packet::GET_HEADER_PROOFS => self.get_header_proofs(peer, io, rlp), packet::GET_HEADER_PROOFS => self.get_header_proofs(peer, io, rlp),
packet::HEADER_PROOFS => self.header_proofs(peer, io, rlp), packet::HEADER_PROOFS => self.header_proofs(peer, io, rlp),
packet::SEND_TRANSACTIONS => self.relay_transactions(peer, io, rlp), packet::SEND_TRANSACTIONS => self.relay_transactions(peer, rlp),
other => { other => {
Err(Error::UnrecognizedPacket(other)) Err(Error::UnrecognizedPacket(other))

View File

@ -201,6 +201,16 @@ impl Default for Capabilities {
} }
} }
impl Capabilities {
/// Update the capabilities from an announcement.
pub fn update_from(&mut self, announcement: &Announcement) {
self.serve_headers = self.serve_headers || announcement.serve_headers;
self.serve_state_since = self.serve_state_since.or(announcement.serve_state_since);
self.serve_chain_since = self.serve_chain_since.or(announcement.serve_chain_since);
self.tx_relay = self.tx_relay || announcement.tx_relay;
}
}
/// Attempt to parse a handshake message into its three parts: /// Attempt to parse a handshake message into its three parts:
/// - chain status /// - chain status
/// - serving capabilities /// - serving capabilities