From 25d5efac15142454386122ae3a97258bcad9da6d Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 9 Nov 2016 18:05:00 +0100 Subject: [PATCH] making announcements, clean up warnings --- ethcore/light/src/client.rs | 5 +- ethcore/light/src/net/buffer_flow.rs | 32 ++--- ethcore/light/src/net/mod.rs | 188 ++++++++++++++++++++++----- ethcore/light/src/net/status.rs | 2 +- ethcore/light/src/request.rs | 7 +- 5 files changed, 173 insertions(+), 61 deletions(-) diff --git a/ethcore/light/src/client.rs b/ethcore/light/src/client.rs index fb4cb251f..e3b5745b2 100644 --- a/ethcore/light/src/client.rs +++ b/ethcore/light/src/client.rs @@ -21,11 +21,10 @@ use std::sync::Arc; use ethcore::engines::Engine; use ethcore::ids::BlockID; -use ethcore::miner::TransactionQueue; use ethcore::service::ClientIoMessage; use ethcore::block_import_error::BlockImportError; use ethcore::block_status::BlockStatus; -use ethcore::verification::queue::{Config as QueueConfig, HeaderQueue, QueueInfo, Status}; +use ethcore::verification::queue::{HeaderQueue, QueueInfo}; use ethcore::transaction::SignedTransaction; use ethcore::blockchain_info::BlockChainInfo; @@ -62,7 +61,7 @@ impl Client { } /// Inquire about the status of a given block. - pub fn status(&self, id: BlockID) -> BlockStatus { + pub fn status(&self, _id: BlockID) -> BlockStatus { BlockStatus::Unknown } diff --git a/ethcore/light/src/net/buffer_flow.rs b/ethcore/light/src/net/buffer_flow.rs index 7ee287533..ff4c46f16 100644 --- a/ethcore/light/src/net/buffer_flow.rs +++ b/ethcore/light/src/net/buffer_flow.rs @@ -23,8 +23,9 @@ //! This module provides an interface for configuration of buffer //! flow costs and recharge rates. -use request::{self, Request}; +use request; use super::packet; +use super::error::Error; use rlp::*; use util::U256; @@ -34,10 +35,6 @@ use time::{Duration, SteadyTime}; #[derive(Debug, Clone, PartialEq, Eq)] pub struct Cost(pub U256, pub U256); -/// An error: insufficient buffer. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct InsufficientBuffer; - /// Buffer value. /// /// Produced and recharged using `FlowParams`. @@ -50,6 +47,9 @@ pub struct Buffer { } impl Buffer { + /// Get the current buffer value. + pub fn current(&self) -> U256 { self.estimate.clone() } + /// Make a definitive update. /// This will be the value obtained after receiving /// a response to a request. @@ -59,12 +59,14 @@ impl Buffer { } /// Attempt to apply the given cost to the buffer. + /// /// If successful, the cost will be deducted successfully. + /// /// If unsuccessful, the structure will be unaltered an an /// error will be produced. - pub fn deduct_cost(&mut self, cost: U256) -> Result<(), InsufficientBuffer> { + pub fn deduct_cost(&mut self, cost: U256) -> Result<(), Error> { match cost > self.estimate { - true => Err(InsufficientBuffer), + true => Err(Error::BufferEmpty), false => { self.estimate = self.estimate - cost; Ok(()) @@ -188,23 +190,9 @@ impl FlowParams { /// Get a reference to the recharge rate. pub fn recharge_rate(&self) -> &U256 { &self.recharge } - /// Estimate the maximum cost of the request. - pub fn max_cost(&self, req: &Request) -> U256 { - let amount = match *req { - Request::Headers(ref req) => req.max as usize, - Request::Bodies(ref req) => req.block_hashes.len(), - Request::Receipts(ref req) => req.block_hashes.len(), - Request::StateProofs(ref req) => req.requests.len(), - Request::Codes(ref req) => req.code_requests.len(), - Request::HeaderProofs(ref req) => req.requests.len(), - }; - - self.actual_cost(req.kind(), amount) - } - /// Compute the actual cost of a request, given the kind of request /// and number of requests made. - pub fn actual_cost(&self, kind: request::Kind, amount: usize) -> U256 { + pub fn compute_cost(&self, kind: request::Kind, amount: usize) -> U256 { let cost = match kind { request::Kind::Headers => &self.costs.headers, request::Kind::Bodies => &self.costs.bodies, diff --git a/ethcore/light/src/net/mod.rs b/ethcore/light/src/net/mod.rs index a721fde72..e72ce4bb2 100644 --- a/ethcore/light/src/net/mod.rs +++ b/ethcore/light/src/net/mod.rs @@ -20,13 +20,13 @@ //! See https://github.com/ethcore/parity/wiki/Light-Ethereum-Subprotocol-(LES) use io::TimerToken; -use network::{NetworkProtocolHandler, NetworkService, NetworkContext, NetworkError, PeerId}; -use rlp::{DecoderError, RlpStream, Stream, UntrustedRlp, View}; +use network::{NetworkProtocolHandler, NetworkContext, NetworkError, PeerId}; +use rlp::{RlpStream, Stream, UntrustedRlp, View}; use util::hash::H256; -use util::{U256, Mutex, RwLock}; +use util::RwLock; use std::collections::{HashMap, HashSet}; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::AtomicUsize; use provider::Provider; use request::{self, Request}; @@ -39,6 +39,8 @@ mod buffer_flow; mod error; mod status; +pub use self::status::Announcement; + const TIMEOUT: TimerToken = 0; const TIMEOUT_INTERVAL_MS: u64 = 1000; @@ -120,11 +122,38 @@ pub struct LightProtocol { } impl LightProtocol { - // Check on the status of all pending requests. - fn check_pending_requests(&self) { - unimplemented!() - } + /// Make an announcement of new chain head and capabilities to all peers. + /// The announcement is expected to be valid. + pub fn make_announcement(&self, mut announcement: Announcement, io: &NetworkContext) { + let mut reorgs_map = HashMap::new(); + // calculate reorg info and send packets + for (peer_id, peer_info) in self.peers.write().iter_mut() { + let reorg_depth = reorgs_map.entry(peer_info.sent_head) + .or_insert_with(|| { + match self.provider.reorg_depth(&announcement.head_hash, &peer_info.sent_head) { + Some(depth) => depth, + None => { + // both values will always originate locally -- this means something + // has gone really wrong + debug!(target: "les", "couldn't compute reorganization depth between {:?} and {:?}", + &announcement.head_hash, &peer_info.sent_head); + 0 + } + } + }); + + peer_info.sent_head = announcement.head_hash; + announcement.reorg_depth = *reorg_depth; + + if let Err(e) = io.send(*peer_id, packet::ANNOUNCE, status::write_announcement(&announcement)) { + debug!(target: "les", "Error sending to peer {}: {}", peer_id, e); + } + } + } +} + +impl LightProtocol { // called when a peer connects. fn on_connect(&self, peer: &PeerId, io: &NetworkContext) { let peer = *peer; @@ -141,7 +170,7 @@ impl LightProtocol { } // called when a peer disconnects. - fn on_disconnect(&self, peer: PeerId, io: &NetworkContext) { + fn on_disconnect(&self, peer: PeerId) { // TODO: reassign all requests assigned to this peer. self.pending_peers.write().remove(&peer); self.peers.write().remove(&peer); @@ -174,7 +203,7 @@ impl LightProtocol { } // Handle status message from peer. - fn status(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { + fn status(&self, peer: &PeerId, data: UntrustedRlp) -> Result<(), Error> { let pending = match self.pending_peers.write().remove(peer) { Some(pending) => pending, None => { @@ -204,7 +233,7 @@ impl LightProtocol { } // Handle an announcement. - fn announcement(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { + fn announcement(&self, peer: &PeerId, data: UntrustedRlp) -> Result<(), Error> { if !self.peers.read().contains_key(peer) { debug!(target: "les", "Ignoring announcement from unknown peer"); return Ok(()) @@ -245,70 +274,161 @@ impl LightProtocol { // Handle a request for block headers. fn get_block_headers(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { - const MAX_HEADERS: u64 = 512; + const MAX_HEADERS: usize = 512; - unimplemented!() + let mut present_buffer = match self.peers.read().get(peer) { + Some(peer) => peer.local_buffer.clone(), + None => { + debug!(target: "les", "Ignoring announcement from unknown peer"); + return Ok(()) + } + }; + + self.flow_params.recharge(&mut present_buffer); + let req_id: u64 = try!(data.val_at(0)); + + let req = request::Headers { + block: { + let rlp = try!(data.at(1)); + (try!(rlp.val_at(0)), try!(rlp.val_at(1))) + }, + max: ::std::cmp::min(MAX_HEADERS, try!(data.val_at(2))), + skip: try!(data.val_at(3)), + reverse: try!(data.val_at(4)), + }; + + let max_cost = self.flow_params.compute_cost(request::Kind::Headers, req.max); + try!(present_buffer.deduct_cost(max_cost)); + + let response = self.provider.block_headers(req); + let actual_cost = self.flow_params.compute_cost(request::Kind::Headers, response.len()); + + let cur_buffer = match self.peers.write().get_mut(peer) { + Some(peer) => { + self.flow_params.recharge(&mut peer.local_buffer); + try!(peer.local_buffer.deduct_cost(actual_cost)); + peer.local_buffer.current() + } + None => { + debug!(target: "les", "peer disconnected during serving of request."); + return Ok(()) + } + }; + + io.respond(packet::BLOCK_HEADERS, { + let mut stream = RlpStream::new_list(response.len() + 2); + stream.append(&req_id).append(&cur_buffer); + + for header in response { + stream.append_raw(&header, 1); + } + + stream.out() + }).map_err(Into::into) } // Receive a response for block headers. - fn block_headers(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { + fn block_headers(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { unimplemented!() } // Handle a request for block bodies. fn get_block_bodies(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { - const MAX_BODIES: usize = 512; + const MAX_BODIES: usize = 256; - unimplemented!() + let mut present_buffer = match self.peers.read().get(peer) { + Some(peer) => peer.local_buffer.clone(), + None => { + debug!(target: "les", "Ignoring announcement from unknown peer"); + return Ok(()) + } + }; + + self.flow_params.recharge(&mut present_buffer); + let req_id: u64 = try!(data.val_at(0)); + + let req = request::Bodies { + block_hashes: try!(data.iter().skip(1).take(MAX_BODIES).map(|x| x.as_val()).collect()) + }; + + let max_cost = self.flow_params.compute_cost(request::Kind::Bodies, req.block_hashes.len()); + try!(present_buffer.deduct_cost(max_cost)); + + let response = self.provider.block_bodies(req); + let response_len = response.iter().filter(|x| &x[..] != &::rlp::EMPTY_LIST_RLP).count(); + let actual_cost = self.flow_params.compute_cost(request::Kind::Bodies, response_len); + + let cur_buffer = match self.peers.write().get_mut(peer) { + Some(peer) => { + self.flow_params.recharge(&mut peer.local_buffer); + try!(peer.local_buffer.deduct_cost(actual_cost)); + peer.local_buffer.current() + } + None => { + debug!(target: "les", "peer disconnected during serving of request."); + return Ok(()) + } + }; + + io.respond(packet::BLOCK_BODIES, { + let mut stream = RlpStream::new_list(response.len() + 2); + stream.append(&req_id).append(&cur_buffer); + + for body in response { + stream.append_raw(&body, 1); + } + + stream.out() + }).map_err(Into::into) } // Receive a response for block bodies. - fn block_bodies(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { + fn block_bodies(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { unimplemented!() } // Handle a request for receipts. - fn get_receipts(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { + fn get_receipts(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { unimplemented!() } // Receive a response for receipts. - fn receipts(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { + fn receipts(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { unimplemented!() } // Handle a request for proofs. - fn get_proofs(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { + fn get_proofs(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { unimplemented!() } // Receive a response for proofs. - fn proofs(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { + fn proofs(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { unimplemented!() } // Handle a request for contract code. - fn get_contract_code(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { + fn get_contract_code(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { unimplemented!() } // Receive a response for contract code. - fn contract_code(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { + fn contract_code(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { unimplemented!() } // Handle a request for header proofs - fn get_header_proofs(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { + fn get_header_proofs(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { unimplemented!() } // Receive a response for header proofs - fn header_proofs(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { + fn header_proofs(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { unimplemented!() } // Receive a set of transactions to relay. - fn relay_transactions(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { + fn relay_transactions(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { unimplemented!() } } @@ -320,9 +440,11 @@ impl NetworkProtocolHandler for LightProtocol { fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { let rlp = UntrustedRlp::new(data); + + // handle the packet let res = match packet_id { - packet::STATUS => self.status(peer, io, rlp), - packet::ANNOUNCE => self.announcement(peer, io, rlp), + packet::STATUS => self.status(peer, rlp), + packet::ANNOUNCE => self.announcement(peer, rlp), packet::GET_BLOCK_HEADERS => self.get_block_headers(peer, io, rlp), packet::BLOCK_HEADERS => self.block_headers(peer, io, rlp), @@ -339,6 +461,9 @@ impl NetworkProtocolHandler for LightProtocol { packet::GET_CONTRACT_CODES => self.get_contract_code(peer, io, rlp), packet::CONTRACT_CODES => self.contract_code(peer, io, rlp), + packet::GET_HEADER_PROOFS => self.get_header_proofs(peer, io, rlp), + packet::HEADER_PROOFS => self.header_proofs(peer, io, rlp), + packet::SEND_TRANSACTIONS => self.relay_transactions(peer, io, rlp), other => { @@ -346,6 +471,7 @@ impl NetworkProtocolHandler for LightProtocol { } }; + // if something went wrong, figure out how much to punish the peer. if let Err(e) = res { match e.punishment() { Punishment::None => {} @@ -365,11 +491,11 @@ impl NetworkProtocolHandler for LightProtocol { self.on_connect(peer, io); } - fn disconnected(&self, io: &NetworkContext, peer: &PeerId) { - self.on_disconnect(*peer, io); + fn disconnected(&self, _io: &NetworkContext, peer: &PeerId) { + self.on_disconnect(*peer); } - fn timeout(&self, io: &NetworkContext, timer: TimerToken) { + fn timeout(&self, _io: &NetworkContext, timer: TimerToken) { match timer { TIMEOUT => { // broadcast transactions to peers. diff --git a/ethcore/light/src/net/status.rs b/ethcore/light/src/net/status.rs index 1cac14845..5aaea9f3a 100644 --- a/ethcore/light/src/net/status.rs +++ b/ethcore/light/src/net/status.rs @@ -19,7 +19,7 @@ use rlp::{DecoderError, RlpDecodable, RlpEncodable, RlpStream, Stream, UntrustedRlp, View}; use util::{H256, U256}; -use super::buffer_flow::{CostTable, FlowParams}; +use super::buffer_flow::FlowParams; // recognized handshake/announcement keys. // unknown keys are to be skipped, known keys have a defined order. diff --git a/ethcore/light/src/request.rs b/ethcore/light/src/request.rs index 11b474ee5..f043f0f25 100644 --- a/ethcore/light/src/request.rs +++ b/ethcore/light/src/request.rs @@ -18,8 +18,7 @@ // TODO: make IPC compatible. -use ethcore::transaction::Transaction; -use util::{Address, H256}; +use util::H256; /// A request for block headers. #[derive(Debug, Clone, PartialEq, Eq)] @@ -27,9 +26,9 @@ pub struct Headers { /// Block information for the request being made. pub block: (u64, H256), /// The maximum amount of headers which can be returned. - pub max: u64, + pub max: usize, /// The amount of headers to skip between each response entry. - pub skip: u64, + pub skip: usize, /// Whether the headers should proceed in falling number from the initial block. pub reverse: bool, }