diff --git a/ethcore/src/light/net/event.rs b/ethcore/src/light/net/event.rs deleted file mode 100644 index f7dee217a..000000000 --- a/ethcore/src/light/net/event.rs +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright 2015, 2016 Ethcore (UK) Ltd. -// This file is part of Parity. - -// Parity is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity. If not, see . - -//! Network events and event listeners. - -use network::PeerId; - -use super::{Status, Capabilities, Announcement}; - -use transaction::SignedTransaction; - -/// Peer connected -pub struct Connect(PeerId, Status, Capabilities); - -/// Peer disconnected -pub struct Disconnect(PeerId); - -/// Peer announces new capabilities. -pub struct Announcement(PeerId, Announcement); - -/// Transactions to be relayed. -pub struct RelayTransactions(Vec); - -/// An LES event handler. -pub trait Handler { - fn on_connect(&self, _event: Connect); -} \ No newline at end of file diff --git a/ethcore/src/light/net/mod.rs b/ethcore/src/light/net/mod.rs index fc9ecbd18..8d369e7bb 100644 --- a/ethcore/src/light/net/mod.rs +++ b/ethcore/src/light/net/mod.rs @@ -30,16 +30,15 @@ use std::sync::atomic::AtomicUsize; use light::provider::Provider; use light::request::{self, Request}; +use transaction::SignedTransaction; use self::buffer_flow::{Buffer, FlowParams}; use self::error::{Error, Punishment}; -use self::status::{Status, Capabilities}; mod buffer_flow; mod error; mod status; -pub mod event; pub use self::status::{Status, Capabilities, Announcement}; const TIMEOUT: TimerToken = 0; @@ -126,6 +125,18 @@ impl Peer { } } +/// An LES event handler. +pub trait Handler: Send + Sync { + /// Called when a peer connects. + fn on_connect(&self, _id: PeerId, _status: &Status, _capabilities: &Capabilities) { } + /// Called when a peer disconnects + fn on_disconnect(&self, _id: PeerId) { } + /// Called when a peer makes an announcement. + fn on_announcement(&self, _id: PeerId, _announcement: &Announcement) { } + /// Called when a peer requests relay of some transactions. + fn on_transactions(&self, _id: PeerId, _relay: &[SignedTransaction]) { } +} + /// This is an implementation of the light ethereum network protocol, abstracted /// over a `Provider` of data and a p2p network. /// @@ -141,6 +152,7 @@ pub struct LightProtocol { pending_requests: RwLock>, capabilities: RwLock, flow_params: FlowParams, // assumed static and same for every peer. + handlers: Vec>, req_id: AtomicUsize, } @@ -150,6 +162,9 @@ impl LightProtocol { pub fn make_announcement(&self, mut announcement: Announcement, io: &NetworkContext) { let mut reorgs_map = HashMap::new(); + // update stored capabilities + self.capabilities.write().update_from(&announcement); + // calculate reorg info and send packets for (peer_id, peer_info) in self.peers.write().iter_mut() { let reorg_depth = reorgs_map.entry(peer_info.sent_head) @@ -174,6 +189,14 @@ impl LightProtocol { } } } + + /// Add an event handler. + /// Ownership will be transferred to the protocol structure, + /// and the handler will be kept alive as long as it is. + /// These are intended to be added at the beginning of the + pub fn add_handler(&mut self, handler: Box) { + self.handlers.push(handler); + } } impl LightProtocol { @@ -196,7 +219,11 @@ impl LightProtocol { fn on_disconnect(&self, peer: PeerId) { // TODO: reassign all requests assigned to this peer. self.pending_peers.write().remove(&peer); - self.peers.write().remove(&peer); + if self.peers.write().remove(&peer).is_some() { + for handler in &self.handlers { + handler.on_disconnect(peer) + } + } } // send status to a peer. @@ -246,12 +273,16 @@ impl LightProtocol { local_buffer: Mutex::new(self.flow_params.create_buffer()), remote_buffer: flow_params.create_buffer(), current_asking: HashSet::new(), - status: status, - capabilities: capabilities, + status: status.clone(), + capabilities: capabilities.clone(), remote_flow: flow_params, sent_head: pending.sent_head, }); + for handler in &self.handlers { + handler.on_connect(*peer, &status, &capabilities) + } + Ok(()) } @@ -282,15 +313,11 @@ impl LightProtocol { } // update capabilities. - { - let caps = &mut peer_info.capabilities; - caps.serve_headers = caps.serve_headers || announcement.serve_headers; - caps.serve_state_since = caps.serve_state_since.or(announcement.serve_state_since); - caps.serve_chain_since = caps.serve_chain_since.or(announcement.serve_chain_since); - caps.tx_relay = caps.tx_relay || announcement.tx_relay; - } + peer_info.capabilities.update_from(&announcement); - // TODO: notify listeners if new best block. + for handler in &self.handlers { + handler.on_announcement(*peer, &announcement); + } Ok(()) } @@ -603,8 +630,18 @@ impl LightProtocol { } // Receive a set of transactions to relay. - fn relay_transactions(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { - unimplemented!() + fn relay_transactions(&self, peer: &PeerId, data: UntrustedRlp) -> Result<(), Error> { + const MAX_TRANSACTIONS: usize = 256; + + let txs: Vec<_> = try!(data.iter().take(MAX_TRANSACTIONS).map(|x| x.as_val::()).collect()); + + debug!(target: "les", "Received {} transactions to relay from peer {}", txs.len(), peer); + + for handler in &self.handlers { + handler.on_transactions(*peer, &txs); + } + + Ok(()) } } @@ -639,7 +676,7 @@ impl NetworkProtocolHandler for LightProtocol { packet::GET_HEADER_PROOFS => self.get_header_proofs(peer, io, rlp), packet::HEADER_PROOFS => self.header_proofs(peer, io, rlp), - packet::SEND_TRANSACTIONS => self.relay_transactions(peer, io, rlp), + packet::SEND_TRANSACTIONS => self.relay_transactions(peer, rlp), other => { Err(Error::UnrecognizedPacket(other)) diff --git a/ethcore/src/light/net/status.rs b/ethcore/src/light/net/status.rs index 5aaea9f3a..e8f874621 100644 --- a/ethcore/src/light/net/status.rs +++ b/ethcore/src/light/net/status.rs @@ -201,6 +201,16 @@ impl Default for Capabilities { } } +impl Capabilities { + /// Update the capabilities from an announcement. + pub fn update_from(&mut self, announcement: &Announcement) { + self.serve_headers = self.serve_headers || announcement.serve_headers; + self.serve_state_since = self.serve_state_since.or(announcement.serve_state_since); + self.serve_chain_since = self.serve_chain_since.or(announcement.serve_chain_since); + self.tx_relay = self.tx_relay || announcement.tx_relay; + } +} + /// Attempt to parse a handshake message into its three parts: /// - chain status /// - serving capabilities