From 3854b8a689d6c6e7634491f60e72a7d244de0cc0 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 10 Nov 2016 18:30:17 +0100 Subject: [PATCH] LES Part 1 (#3322) * stub implementations of light client trait * Light provider trait * light client sync stubs * LES boilerplate * stub implementation of provider for client * skeleton and request traits * request definitions * new_list -> begin_list * handle unknown packet * revise light implementation strategy * make verification module public * Move all light client work to own crate * experiment with answering requests * buffer flow scaffolding * remove LESv2 requests * buffer flow basics, implement cost table * begin status module * implement handshake parsing and creation * implement announcement serialization * errors, punishment, and handshake * handle announcements * making announcements, clean up warnings * allow dead code temporarily --- ethcore/light/Cargo.toml | 16 + ethcore/light/src/client.rs | 115 +++++ ethcore/light/src/lib.rs | 47 ++ ethcore/light/src/net/buffer_flow.rs | 264 ++++++++++ ethcore/light/src/net/error.rs | 94 ++++ ethcore/light/src/net/mod.rs | 506 +++++++++++++++++++ ethcore/light/src/net/status.rs | 539 +++++++++++++++++++++ ethcore/light/src/provider.rs | 71 +++ ethcore/light/src/request.rs | 145 ++++++ ethcore/src/lib.rs | 2 +- ethcore/src/types/mod.rs.in | 2 +- ethcore/src/verification/canon_verifier.rs | 3 + ethcore/src/verification/mod.rs | 3 + ethcore/src/verification/noop_verifier.rs | 3 + ethcore/src/verification/verification.rs | 12 +- ethcore/src/verification/verifier.rs | 4 + sync/src/sync_io.rs | 4 +- util/fetch/src/lib.rs | 2 +- 18 files changed, 1821 insertions(+), 11 deletions(-) create mode 100644 ethcore/light/Cargo.toml create mode 100644 ethcore/light/src/client.rs create mode 100644 ethcore/light/src/lib.rs create mode 100644 ethcore/light/src/net/buffer_flow.rs create mode 100644 ethcore/light/src/net/error.rs create mode 100644 ethcore/light/src/net/mod.rs create mode 100644 ethcore/light/src/net/status.rs create mode 100644 ethcore/light/src/provider.rs create mode 100644 ethcore/light/src/request.rs diff --git a/ethcore/light/Cargo.toml b/ethcore/light/Cargo.toml new file mode 100644 index 000000000..daf141de7 --- /dev/null +++ b/ethcore/light/Cargo.toml @@ -0,0 +1,16 @@ +[package] +description = "Parity LES primitives" +homepage = "https://ethcore.io" +license = "GPL-3.0" +name = "ethcore-light" +version = "1.5.0" +authors = ["Ethcore "] + +[dependencies] +log = "0.3" +ethcore = { path = ".." } +ethcore-util = { path = "../../util" } +ethcore-network = { path = "../../util/network" } +ethcore-io = { path = "../../util/io" } +rlp = { path = "../../util/rlp" } +time = "0.1" \ No newline at end of file diff --git a/ethcore/light/src/client.rs b/ethcore/light/src/client.rs new file mode 100644 index 000000000..e3b5745b2 --- /dev/null +++ b/ethcore/light/src/client.rs @@ -0,0 +1,115 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Light client implementation. Used for raw data queries as well as the header +//! sync. + +use std::sync::Arc; + +use ethcore::engines::Engine; +use ethcore::ids::BlockID; +use ethcore::service::ClientIoMessage; +use ethcore::block_import_error::BlockImportError; +use ethcore::block_status::BlockStatus; +use ethcore::verification::queue::{HeaderQueue, QueueInfo}; +use ethcore::transaction::SignedTransaction; +use ethcore::blockchain_info::BlockChainInfo; + +use io::IoChannel; +use util::hash::H256; +use util::{Bytes, Mutex}; + +use provider::Provider; +use request; + +/// Light client implementation. +pub struct Client { + engine: Arc, + header_queue: HeaderQueue, + message_channel: Mutex>, +} + +impl Client { + /// Import a header as rlp-encoded bytes. + pub fn import_header(&self, bytes: Bytes) -> Result { + let header = ::rlp::decode(&bytes); + + self.header_queue.import(header).map_err(Into::into) + } + + /// Whether the block is already known (but not necessarily part of the canonical chain) + pub fn is_known(&self, _id: BlockID) -> bool { + false + } + + /// Fetch a vector of all pending transactions. + pub fn pending_transactions(&self) -> Vec { + vec![] + } + + /// Inquire about the status of a given block. + pub fn status(&self, _id: BlockID) -> BlockStatus { + BlockStatus::Unknown + } + + /// Get the header queue info. + pub fn queue_info(&self) -> QueueInfo { + self.header_queue.queue_info() + } +} + +// dummy implementation -- may draw from canonical cache further on. +impl Provider for Client { + fn chain_info(&self) -> BlockChainInfo { + unimplemented!() + } + + fn reorg_depth(&self, _a: &H256, _b: &H256) -> Option { + None + } + + fn earliest_state(&self) -> Option { + None + } + + fn block_headers(&self, _req: request::Headers) -> Vec { + Vec::new() + } + + fn block_bodies(&self, _req: request::Bodies) -> Vec { + Vec::new() + } + + fn receipts(&self, _req: request::Receipts) -> Vec { + Vec::new() + } + + fn proofs(&self, _req: request::StateProofs) -> Vec { + Vec::new() + } + + fn code(&self, _req: request::ContractCodes) -> Vec { + Vec::new() + } + + fn header_proofs(&self, _req: request::HeaderProofs) -> Vec { + Vec::new() + } + + fn pending_transactions(&self) -> Vec { + Vec::new() + } +} \ No newline at end of file diff --git a/ethcore/light/src/lib.rs b/ethcore/light/src/lib.rs new file mode 100644 index 000000000..07e6833a7 --- /dev/null +++ b/ethcore/light/src/lib.rs @@ -0,0 +1,47 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Light client logic and implementation. +//! +//! A "light" client stores very little chain-related data locally +//! unlike a full node, which stores all blocks, headers, receipts, and more. +//! +//! This enables the client to have a much lower resource footprint in +//! exchange for the cost of having to ask the network for state data +//! while responding to queries. This makes a light client unsuitable for +//! low-latency applications, but perfectly suitable for simple everyday +//! use-cases like sending transactions from a personal account. +//! +//! It starts by performing a header-only sync, verifying random samples +//! of members of the chain to varying degrees. + +// TODO: remove when integrating with parity. +#![allow(dead_code)] + +pub mod client; +pub mod net; +pub mod provider; +pub mod request; + +extern crate ethcore_util as util; +extern crate ethcore_network as network; +extern crate ethcore_io as io; +extern crate ethcore; +extern crate rlp; +extern crate time; + +#[macro_use] +extern crate log; \ No newline at end of file diff --git a/ethcore/light/src/net/buffer_flow.rs b/ethcore/light/src/net/buffer_flow.rs new file mode 100644 index 000000000..b7bd30f82 --- /dev/null +++ b/ethcore/light/src/net/buffer_flow.rs @@ -0,0 +1,264 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! LES buffer flow management. +//! +//! Every request in the LES protocol leads to a reduction +//! of the requester's buffer value as a rate-limiting mechanism. +//! This buffer value will recharge at a set rate. +//! +//! This module provides an interface for configuration of buffer +//! flow costs and recharge rates. + +use request; +use super::packet; +use super::error::Error; + +use rlp::*; +use util::U256; +use time::{Duration, SteadyTime}; + +/// A request cost specification. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Cost(pub U256, pub U256); + +/// Buffer value. +/// +/// Produced and recharged using `FlowParams`. +/// Definitive updates can be made as well -- these will reset the recharge +/// point to the time of the update. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Buffer { + estimate: U256, + recharge_point: SteadyTime, +} + +impl Buffer { + /// Get the current buffer value. + pub fn current(&self) -> U256 { self.estimate.clone() } + + /// Make a definitive update. + /// This will be the value obtained after receiving + /// a response to a request. + pub fn update_to(&mut self, value: U256) { + self.estimate = value; + self.recharge_point = SteadyTime::now(); + } + + /// Attempt to apply the given cost to the buffer. + /// + /// If successful, the cost will be deducted successfully. + /// + /// If unsuccessful, the structure will be unaltered an an + /// error will be produced. + pub fn deduct_cost(&mut self, cost: U256) -> Result<(), Error> { + match cost > self.estimate { + true => Err(Error::BufferEmpty), + false => { + self.estimate = self.estimate - cost; + Ok(()) + } + } + } +} + +/// A cost table, mapping requests to base and per-request costs. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CostTable { + headers: Cost, + bodies: Cost, + receipts: Cost, + state_proofs: Cost, + contract_codes: Cost, + header_proofs: Cost, +} + +impl Default for CostTable { + fn default() -> Self { + // arbitrarily chosen constants. + CostTable { + headers: Cost(100000.into(), 10000.into()), + bodies: Cost(150000.into(), 15000.into()), + receipts: Cost(50000.into(), 5000.into()), + state_proofs: Cost(250000.into(), 25000.into()), + contract_codes: Cost(200000.into(), 20000.into()), + header_proofs: Cost(150000.into(), 15000.into()), + } + } +} + +impl RlpEncodable for CostTable { + fn rlp_append(&self, s: &mut RlpStream) { + fn append_cost(s: &mut RlpStream, msg_id: u8, cost: &Cost) { + s.begin_list(3) + .append(&msg_id) + .append(&cost.0) + .append(&cost.1); + } + + s.begin_list(6); + + append_cost(s, packet::GET_BLOCK_HEADERS, &self.headers); + append_cost(s, packet::GET_BLOCK_BODIES, &self.bodies); + append_cost(s, packet::GET_RECEIPTS, &self.receipts); + append_cost(s, packet::GET_PROOFS, &self.state_proofs); + append_cost(s, packet::GET_CONTRACT_CODES, &self.contract_codes); + append_cost(s, packet::GET_HEADER_PROOFS, &self.header_proofs); + } +} + +impl RlpDecodable for CostTable { + fn decode(decoder: &D) -> Result where D: Decoder { + let rlp = decoder.as_rlp(); + + let mut headers = None; + let mut bodies = None; + let mut receipts = None; + let mut state_proofs = None; + let mut contract_codes = None; + let mut header_proofs = None; + + for row in rlp.iter() { + let msg_id: u8 = try!(row.val_at(0)); + let cost = { + let base = try!(row.val_at(1)); + let per = try!(row.val_at(2)); + + Cost(base, per) + }; + + match msg_id { + packet::GET_BLOCK_HEADERS => headers = Some(cost), + packet::GET_BLOCK_BODIES => bodies = Some(cost), + packet::GET_RECEIPTS => receipts = Some(cost), + packet::GET_PROOFS => state_proofs = Some(cost), + packet::GET_CONTRACT_CODES => contract_codes = Some(cost), + packet::GET_HEADER_PROOFS => header_proofs = Some(cost), + _ => return Err(DecoderError::Custom("Unrecognized message in cost table")), + } + } + + Ok(CostTable { + headers: try!(headers.ok_or(DecoderError::Custom("No headers cost specified"))), + bodies: try!(bodies.ok_or(DecoderError::Custom("No bodies cost specified"))), + receipts: try!(receipts.ok_or(DecoderError::Custom("No receipts cost specified"))), + state_proofs: try!(state_proofs.ok_or(DecoderError::Custom("No proofs cost specified"))), + contract_codes: try!(contract_codes.ok_or(DecoderError::Custom("No contract codes specified"))), + header_proofs: try!(header_proofs.ok_or(DecoderError::Custom("No header proofs cost specified"))), + }) + } +} + +/// A buffer-flow manager handles costs, recharge, limits +#[derive(Debug, Clone, PartialEq)] +pub struct FlowParams { + costs: CostTable, + limit: U256, + recharge: U256, +} + +impl FlowParams { + /// Create new flow parameters from a request cost table, + /// buffer limit, and (minimum) rate of recharge. + pub fn new(limit: U256, costs: CostTable, recharge: U256) -> Self { + FlowParams { + costs: costs, + limit: limit, + recharge: recharge, + } + } + + /// Get a reference to the buffer limit. + pub fn limit(&self) -> &U256 { &self.limit } + + /// Get a reference to the cost table. + pub fn cost_table(&self) -> &CostTable { &self.costs } + + /// Get a reference to the recharge rate. + pub fn recharge_rate(&self) -> &U256 { &self.recharge } + + /// Compute the actual cost of a request, given the kind of request + /// and number of requests made. + pub fn compute_cost(&self, kind: request::Kind, amount: usize) -> U256 { + let cost = match kind { + request::Kind::Headers => &self.costs.headers, + request::Kind::Bodies => &self.costs.bodies, + request::Kind::Receipts => &self.costs.receipts, + request::Kind::StateProofs => &self.costs.state_proofs, + request::Kind::Codes => &self.costs.contract_codes, + request::Kind::HeaderProofs => &self.costs.header_proofs, + }; + + let amount: U256 = amount.into(); + cost.0 + (amount * cost.1) + } + + /// Create initial buffer parameter. + pub fn create_buffer(&self) -> Buffer { + Buffer { + estimate: self.limit, + recharge_point: SteadyTime::now(), + } + } + + /// Recharge the buffer based on time passed since last + /// update. + pub fn recharge(&self, buf: &mut Buffer) { + let now = SteadyTime::now(); + + // recompute and update only in terms of full seconds elapsed + // in order to keep the estimate as an underestimate. + let elapsed = (now - buf.recharge_point).num_seconds(); + buf.recharge_point = buf.recharge_point + Duration::seconds(elapsed); + + let elapsed: U256 = elapsed.into(); + + buf.estimate = ::std::cmp::min(self.limit, buf.estimate + (elapsed * self.recharge)); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn should_serialize_cost_table() { + let costs = CostTable::default(); + let serialized = ::rlp::encode(&costs); + + let new_costs: CostTable = ::rlp::decode(&*serialized); + + assert_eq!(costs, new_costs); + } + + #[test] + fn buffer_mechanism() { + use std::thread; + use std::time::Duration; + + let flow_params = FlowParams::new(100.into(), Default::default(), 20.into()); + let mut buffer = flow_params.create_buffer(); + + assert!(buffer.deduct_cost(101.into()).is_err()); + assert!(buffer.deduct_cost(10.into()).is_ok()); + + thread::sleep(Duration::from_secs(1)); + + flow_params.recharge(&mut buffer); + + assert_eq!(buffer.estimate, 100.into()); + } +} \ No newline at end of file diff --git a/ethcore/light/src/net/error.rs b/ethcore/light/src/net/error.rs new file mode 100644 index 000000000..e15bd50d3 --- /dev/null +++ b/ethcore/light/src/net/error.rs @@ -0,0 +1,94 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Defines error types and levels of punishment to use upon +//! encountering. + +use rlp::DecoderError; +use network::NetworkError; + +use std::fmt; + +/// Levels of punishment. +/// +/// Currently just encompasses two different kinds of disconnect and +/// no punishment, but this is where reputation systems might come into play. +// In ascending order +#[derive(Debug, PartialEq, Eq)] +pub enum Punishment { + /// Perform no punishment. + None, + /// Disconnect the peer, but don't prevent them from reconnecting. + Disconnect, + /// Disconnect the peer and prevent them from reconnecting. + Disable, +} + +/// Kinds of errors which can be encountered in the course of LES. +#[derive(Debug)] +pub enum Error { + /// An RLP decoding error. + Rlp(DecoderError), + /// A network error. + Network(NetworkError), + /// Out of buffer. + BufferEmpty, + /// Unrecognized packet code. + UnrecognizedPacket(u8), + /// Unexpected handshake. + UnexpectedHandshake, + /// Peer on wrong network (wrong NetworkId or genesis hash) + WrongNetwork, +} + +impl Error { + /// What level of punishment does this error warrant? + pub fn punishment(&self) -> Punishment { + match *self { + Error::Rlp(_) => Punishment::Disable, + Error::Network(_) => Punishment::None, + Error::BufferEmpty => Punishment::Disable, + Error::UnrecognizedPacket(_) => Punishment::Disconnect, + Error::UnexpectedHandshake => Punishment::Disconnect, + Error::WrongNetwork => Punishment::Disable, + } + } +} + +impl From for Error { + fn from(err: DecoderError) -> Self { + Error::Rlp(err) + } +} + +impl From for Error { + fn from(err: NetworkError) -> Self { + Error::Network(err) + } +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + Error::Rlp(ref err) => err.fmt(f), + Error::Network(ref err) => err.fmt(f), + Error::BufferEmpty => write!(f, "Out of buffer"), + Error::UnrecognizedPacket(code) => write!(f, "Unrecognized packet: 0x{:x}", code), + Error::UnexpectedHandshake => write!(f, "Unexpected handshake"), + Error::WrongNetwork => write!(f, "Wrong network"), + } + } +} \ No newline at end of file diff --git a/ethcore/light/src/net/mod.rs b/ethcore/light/src/net/mod.rs new file mode 100644 index 000000000..e72ce4bb2 --- /dev/null +++ b/ethcore/light/src/net/mod.rs @@ -0,0 +1,506 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! LES Protocol Version 1 implementation. +//! +//! This uses a "Provider" to answer requests. +//! See https://github.com/ethcore/parity/wiki/Light-Ethereum-Subprotocol-(LES) + +use io::TimerToken; +use network::{NetworkProtocolHandler, NetworkContext, NetworkError, PeerId}; +use rlp::{RlpStream, Stream, UntrustedRlp, View}; +use util::hash::H256; +use util::RwLock; + +use std::collections::{HashMap, HashSet}; +use std::sync::atomic::AtomicUsize; + +use provider::Provider; +use request::{self, Request}; + +use self::buffer_flow::{Buffer, FlowParams}; +use self::error::{Error, Punishment}; +use self::status::{Status, Capabilities}; + +mod buffer_flow; +mod error; +mod status; + +pub use self::status::Announcement; + +const TIMEOUT: TimerToken = 0; +const TIMEOUT_INTERVAL_MS: u64 = 1000; + +// LPV1 +const PROTOCOL_VERSION: u32 = 1; + +// TODO [rob] make configurable. +const PROTOCOL_ID: [u8; 3] = *b"les"; + +// packet ID definitions. +mod packet { + // the status packet. + pub const STATUS: u8 = 0x00; + + // announcement of new block hashes or capabilities. + pub const ANNOUNCE: u8 = 0x01; + + // request and response for block headers + pub const GET_BLOCK_HEADERS: u8 = 0x02; + pub const BLOCK_HEADERS: u8 = 0x03; + + // request and response for block bodies + pub const GET_BLOCK_BODIES: u8 = 0x04; + pub const BLOCK_BODIES: u8 = 0x05; + + // request and response for transaction receipts. + pub const GET_RECEIPTS: u8 = 0x06; + pub const RECEIPTS: u8 = 0x07; + + // request and response for merkle proofs. + pub const GET_PROOFS: u8 = 0x08; + pub const PROOFS: u8 = 0x09; + + // request and response for contract code. + pub const GET_CONTRACT_CODES: u8 = 0x0a; + pub const CONTRACT_CODES: u8 = 0x0b; + + // relay transactions to peers. + pub const SEND_TRANSACTIONS: u8 = 0x0c; + + // request and response for header proofs in a CHT. + pub const GET_HEADER_PROOFS: u8 = 0x0d; + pub const HEADER_PROOFS: u8 = 0x0e; +} + +// A pending peer: one we've sent our status to but +// may not have received one for. +struct PendingPeer { + sent_head: H256, +} + +// data about each peer. +struct Peer { + local_buffer: Buffer, // their buffer relative to us + remote_buffer: Buffer, // our buffer relative to them + current_asking: HashSet, // pending request ids. + status: Status, + capabilities: Capabilities, + remote_flow: FlowParams, + sent_head: H256, // last head we've given them. +} + +/// This is an implementation of the light ethereum network protocol, abstracted +/// over a `Provider` of data and a p2p network. +/// +/// This is simply designed for request-response purposes. Higher level uses +/// of the protocol, such as synchronization, will function as wrappers around +/// this system. +pub struct LightProtocol { + provider: Box, + genesis_hash: H256, + network_id: status::NetworkId, + pending_peers: RwLock>, + peers: RwLock>, + pending_requests: RwLock>, + capabilities: RwLock, + flow_params: FlowParams, // assumed static and same for every peer. + req_id: AtomicUsize, +} + +impl LightProtocol { + /// Make an announcement of new chain head and capabilities to all peers. + /// The announcement is expected to be valid. + pub fn make_announcement(&self, mut announcement: Announcement, io: &NetworkContext) { + let mut reorgs_map = HashMap::new(); + + // calculate reorg info and send packets + for (peer_id, peer_info) in self.peers.write().iter_mut() { + let reorg_depth = reorgs_map.entry(peer_info.sent_head) + .or_insert_with(|| { + match self.provider.reorg_depth(&announcement.head_hash, &peer_info.sent_head) { + Some(depth) => depth, + None => { + // both values will always originate locally -- this means something + // has gone really wrong + debug!(target: "les", "couldn't compute reorganization depth between {:?} and {:?}", + &announcement.head_hash, &peer_info.sent_head); + 0 + } + } + }); + + peer_info.sent_head = announcement.head_hash; + announcement.reorg_depth = *reorg_depth; + + if let Err(e) = io.send(*peer_id, packet::ANNOUNCE, status::write_announcement(&announcement)) { + debug!(target: "les", "Error sending to peer {}: {}", peer_id, e); + } + } + } +} + +impl LightProtocol { + // called when a peer connects. + fn on_connect(&self, peer: &PeerId, io: &NetworkContext) { + let peer = *peer; + + match self.send_status(peer, io) { + Ok(pending_peer) => { + self.pending_peers.write().insert(peer, pending_peer); + } + Err(e) => { + trace!(target: "les", "Error while sending status: {}", e); + io.disconnect_peer(peer); + } + } + } + + // called when a peer disconnects. + fn on_disconnect(&self, peer: PeerId) { + // TODO: reassign all requests assigned to this peer. + self.pending_peers.write().remove(&peer); + self.peers.write().remove(&peer); + } + + // send status to a peer. + fn send_status(&self, peer: PeerId, io: &NetworkContext) -> Result { + let chain_info = self.provider.chain_info(); + + // TODO: could update capabilities here. + + let status = Status { + head_td: chain_info.total_difficulty, + head_hash: chain_info.best_block_hash, + head_num: chain_info.best_block_number, + genesis_hash: chain_info.genesis_hash, + protocol_version: PROTOCOL_VERSION, + network_id: self.network_id, + last_head: None, + }; + + let capabilities = self.capabilities.read().clone(); + let status_packet = status::write_handshake(&status, &capabilities, &self.flow_params); + + try!(io.send(peer, packet::STATUS, status_packet)); + + Ok(PendingPeer { + sent_head: chain_info.best_block_hash, + }) + } + + // Handle status message from peer. + fn status(&self, peer: &PeerId, data: UntrustedRlp) -> Result<(), Error> { + let pending = match self.pending_peers.write().remove(peer) { + Some(pending) => pending, + None => { + return Err(Error::UnexpectedHandshake); + } + }; + + let (status, capabilities, flow_params) = try!(status::parse_handshake(data)); + + trace!(target: "les", "Connected peer with chain head {:?}", (status.head_hash, status.head_num)); + + if (status.network_id, status.genesis_hash) != (self.network_id, self.genesis_hash) { + return Err(Error::WrongNetwork); + } + + self.peers.write().insert(*peer, Peer { + local_buffer: self.flow_params.create_buffer(), + remote_buffer: flow_params.create_buffer(), + current_asking: HashSet::new(), + status: status, + capabilities: capabilities, + remote_flow: flow_params, + sent_head: pending.sent_head, + }); + + Ok(()) + } + + // Handle an announcement. + fn announcement(&self, peer: &PeerId, data: UntrustedRlp) -> Result<(), Error> { + if !self.peers.read().contains_key(peer) { + debug!(target: "les", "Ignoring announcement from unknown peer"); + return Ok(()) + } + + let announcement = try!(status::parse_announcement(data)); + let mut peers = self.peers.write(); + + let peer_info = match peers.get_mut(peer) { + Some(info) => info, + None => return Ok(()), + }; + + // update status. + { + // TODO: punish peer if they've moved backwards. + let status = &mut peer_info.status; + let last_head = status.head_hash; + status.head_hash = announcement.head_hash; + status.head_td = announcement.head_td; + status.head_num = announcement.head_num; + status.last_head = Some((last_head, announcement.reorg_depth)); + } + + // update capabilities. + { + let caps = &mut peer_info.capabilities; + caps.serve_headers = caps.serve_headers || announcement.serve_headers; + caps.serve_state_since = caps.serve_state_since.or(announcement.serve_state_since); + caps.serve_chain_since = caps.serve_chain_since.or(announcement.serve_chain_since); + caps.tx_relay = caps.tx_relay || announcement.tx_relay; + } + + // TODO: notify listeners if new best block. + + Ok(()) + } + + // Handle a request for block headers. + fn get_block_headers(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { + const MAX_HEADERS: usize = 512; + + let mut present_buffer = match self.peers.read().get(peer) { + Some(peer) => peer.local_buffer.clone(), + None => { + debug!(target: "les", "Ignoring announcement from unknown peer"); + return Ok(()) + } + }; + + self.flow_params.recharge(&mut present_buffer); + let req_id: u64 = try!(data.val_at(0)); + + let req = request::Headers { + block: { + let rlp = try!(data.at(1)); + (try!(rlp.val_at(0)), try!(rlp.val_at(1))) + }, + max: ::std::cmp::min(MAX_HEADERS, try!(data.val_at(2))), + skip: try!(data.val_at(3)), + reverse: try!(data.val_at(4)), + }; + + let max_cost = self.flow_params.compute_cost(request::Kind::Headers, req.max); + try!(present_buffer.deduct_cost(max_cost)); + + let response = self.provider.block_headers(req); + let actual_cost = self.flow_params.compute_cost(request::Kind::Headers, response.len()); + + let cur_buffer = match self.peers.write().get_mut(peer) { + Some(peer) => { + self.flow_params.recharge(&mut peer.local_buffer); + try!(peer.local_buffer.deduct_cost(actual_cost)); + peer.local_buffer.current() + } + None => { + debug!(target: "les", "peer disconnected during serving of request."); + return Ok(()) + } + }; + + io.respond(packet::BLOCK_HEADERS, { + let mut stream = RlpStream::new_list(response.len() + 2); + stream.append(&req_id).append(&cur_buffer); + + for header in response { + stream.append_raw(&header, 1); + } + + stream.out() + }).map_err(Into::into) + } + + // Receive a response for block headers. + fn block_headers(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { + unimplemented!() + } + + // Handle a request for block bodies. + fn get_block_bodies(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { + const MAX_BODIES: usize = 256; + + let mut present_buffer = match self.peers.read().get(peer) { + Some(peer) => peer.local_buffer.clone(), + None => { + debug!(target: "les", "Ignoring announcement from unknown peer"); + return Ok(()) + } + }; + + self.flow_params.recharge(&mut present_buffer); + let req_id: u64 = try!(data.val_at(0)); + + let req = request::Bodies { + block_hashes: try!(data.iter().skip(1).take(MAX_BODIES).map(|x| x.as_val()).collect()) + }; + + let max_cost = self.flow_params.compute_cost(request::Kind::Bodies, req.block_hashes.len()); + try!(present_buffer.deduct_cost(max_cost)); + + let response = self.provider.block_bodies(req); + let response_len = response.iter().filter(|x| &x[..] != &::rlp::EMPTY_LIST_RLP).count(); + let actual_cost = self.flow_params.compute_cost(request::Kind::Bodies, response_len); + + let cur_buffer = match self.peers.write().get_mut(peer) { + Some(peer) => { + self.flow_params.recharge(&mut peer.local_buffer); + try!(peer.local_buffer.deduct_cost(actual_cost)); + peer.local_buffer.current() + } + None => { + debug!(target: "les", "peer disconnected during serving of request."); + return Ok(()) + } + }; + + io.respond(packet::BLOCK_BODIES, { + let mut stream = RlpStream::new_list(response.len() + 2); + stream.append(&req_id).append(&cur_buffer); + + for body in response { + stream.append_raw(&body, 1); + } + + stream.out() + }).map_err(Into::into) + } + + // Receive a response for block bodies. + fn block_bodies(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { + unimplemented!() + } + + // Handle a request for receipts. + fn get_receipts(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { + unimplemented!() + } + + // Receive a response for receipts. + fn receipts(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { + unimplemented!() + } + + // Handle a request for proofs. + fn get_proofs(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { + unimplemented!() + } + + // Receive a response for proofs. + fn proofs(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { + unimplemented!() + } + + // Handle a request for contract code. + fn get_contract_code(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { + unimplemented!() + } + + // Receive a response for contract code. + fn contract_code(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { + unimplemented!() + } + + // Handle a request for header proofs + fn get_header_proofs(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { + unimplemented!() + } + + // Receive a response for header proofs + fn header_proofs(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { + unimplemented!() + } + + // Receive a set of transactions to relay. + fn relay_transactions(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { + unimplemented!() + } +} + +impl NetworkProtocolHandler for LightProtocol { + fn initialize(&self, io: &NetworkContext) { + io.register_timer(TIMEOUT, TIMEOUT_INTERVAL_MS).expect("Error registering sync timer."); + } + + fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { + let rlp = UntrustedRlp::new(data); + + // handle the packet + let res = match packet_id { + packet::STATUS => self.status(peer, rlp), + packet::ANNOUNCE => self.announcement(peer, rlp), + + packet::GET_BLOCK_HEADERS => self.get_block_headers(peer, io, rlp), + packet::BLOCK_HEADERS => self.block_headers(peer, io, rlp), + + packet::GET_BLOCK_BODIES => self.get_block_bodies(peer, io, rlp), + packet::BLOCK_BODIES => self.block_bodies(peer, io, rlp), + + packet::GET_RECEIPTS => self.get_receipts(peer, io, rlp), + packet::RECEIPTS => self.receipts(peer, io, rlp), + + packet::GET_PROOFS => self.get_proofs(peer, io, rlp), + packet::PROOFS => self.proofs(peer, io, rlp), + + packet::GET_CONTRACT_CODES => self.get_contract_code(peer, io, rlp), + packet::CONTRACT_CODES => self.contract_code(peer, io, rlp), + + packet::GET_HEADER_PROOFS => self.get_header_proofs(peer, io, rlp), + packet::HEADER_PROOFS => self.header_proofs(peer, io, rlp), + + packet::SEND_TRANSACTIONS => self.relay_transactions(peer, io, rlp), + + other => { + Err(Error::UnrecognizedPacket(other)) + } + }; + + // if something went wrong, figure out how much to punish the peer. + if let Err(e) = res { + match e.punishment() { + Punishment::None => {} + Punishment::Disconnect => { + debug!(target: "les", "Disconnecting peer {}: {}", peer, e); + io.disconnect_peer(*peer) + } + Punishment::Disable => { + debug!(target: "les", "Disabling peer {}: {}", peer, e); + io.disable_peer(*peer) + } + } + } + } + + fn connected(&self, io: &NetworkContext, peer: &PeerId) { + self.on_connect(peer, io); + } + + fn disconnected(&self, _io: &NetworkContext, peer: &PeerId) { + self.on_disconnect(*peer); + } + + fn timeout(&self, _io: &NetworkContext, timer: TimerToken) { + match timer { + TIMEOUT => { + // broadcast transactions to peers. + } + _ => warn!(target: "les", "received timeout on unknown token {}", timer), + } + } +} \ No newline at end of file diff --git a/ethcore/light/src/net/status.rs b/ethcore/light/src/net/status.rs new file mode 100644 index 000000000..5aaea9f3a --- /dev/null +++ b/ethcore/light/src/net/status.rs @@ -0,0 +1,539 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Peer status and capabilities. + +use rlp::{DecoderError, RlpDecodable, RlpEncodable, RlpStream, Stream, UntrustedRlp, View}; +use util::{H256, U256}; + +use super::buffer_flow::FlowParams; + +// recognized handshake/announcement keys. +// unknown keys are to be skipped, known keys have a defined order. +// their string values are defined in the LES spec. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Ord, PartialOrd)] +enum Key { + ProtocolVersion, + NetworkId, + HeadTD, + HeadHash, + HeadNum, + GenesisHash, + ServeHeaders, + ServeChainSince, + ServeStateSince, + TxRelay, + BufferLimit, + BufferCostTable, + BufferRechargeRate, +} + +impl Key { + // get the string value of this key. + fn as_str(&self) -> &'static str { + match *self { + Key::ProtocolVersion => "protocolVersion", + Key::NetworkId => "networkId", + Key::HeadTD => "headTd", + Key::HeadHash => "headHash", + Key::HeadNum => "headNum", + Key::GenesisHash => "genesisHash", + Key::ServeHeaders => "serveHeaders", + Key::ServeChainSince => "serveChainSince", + Key::ServeStateSince => "serveStateSince", + Key::TxRelay => "txRelay", + Key::BufferLimit => "flowControl/BL", + Key::BufferCostTable => "flowControl/MRC", + Key::BufferRechargeRate => "flowControl/MRR", + } + } + + // try to parse the key value from a string. + fn from_str(s: &str) -> Option { + match s { + "protocolVersion" => Some(Key::ProtocolVersion), + "networkId" => Some(Key::NetworkId), + "headTd" => Some(Key::HeadTD), + "headHash" => Some(Key::HeadHash), + "headNum" => Some(Key::HeadNum), + "genesisHash" => Some(Key::GenesisHash), + "serveHeaders" => Some(Key::ServeHeaders), + "serveChainSince" => Some(Key::ServeChainSince), + "serveStateSince" => Some(Key::ServeStateSince), + "txRelay" => Some(Key::TxRelay), + "flowControl/BL" => Some(Key::BufferLimit), + "flowControl/MRC" => Some(Key::BufferCostTable), + "flowControl/MRR" => Some(Key::BufferRechargeRate), + _ => None + } + } +} + +/// Network ID structure. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(u32)] +pub enum NetworkId { + /// ID for the mainnet + Mainnet = 1, + /// ID for the testnet + Testnet = 0, +} + +impl NetworkId { + fn from_raw(raw: u32) -> Option { + match raw { + 0 => Some(NetworkId::Testnet), + 1 => Some(NetworkId::Mainnet), + _ => None, + } + } +} + +// helper for decoding key-value pairs in the handshake or an announcement. +struct Parser<'a> { + pos: usize, + rlp: UntrustedRlp<'a>, +} + +impl<'a> Parser<'a> { + // expect a specific next key, and decode the value. + // error on unexpected key or invalid value. + fn expect(&mut self, key: Key) -> Result { + self.expect_raw(key).and_then(|item| item.as_val()) + } + + // expect a specific next key, and get the value's RLP. + // if the key isn't found, the position isn't advanced. + fn expect_raw(&mut self, key: Key) -> Result, DecoderError> { + let pre_pos = self.pos; + if let Some((k, val)) = try!(self.get_next()) { + if k == key { return Ok(val) } + } + + self.pos = pre_pos; + Err(DecoderError::Custom("Missing expected key")) + } + + // get the next key and value RLP. + fn get_next(&mut self) -> Result)>, DecoderError> { + while self.pos < self.rlp.item_count() { + let pair = try!(self.rlp.at(self.pos)); + let k: String = try!(pair.val_at(0)); + + self.pos += 1; + match Key::from_str(&k) { + Some(key) => return Ok(Some((key , try!(pair.at(1))))), + None => continue, + } + } + + Ok(None) + } +} + +// Helper for encoding a key-value pair +fn encode_pair(key: Key, val: &T) -> Vec { + let mut s = RlpStream::new_list(2); + s.append(&key.as_str()).append(val); + s.out() +} + +// Helper for encoding a flag. +fn encode_flag(key: Key) -> Vec { + let mut s = RlpStream::new_list(2); + s.append(&key.as_str()).append_empty_data(); + s.out() +} + +/// A peer status message. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Status { + /// Protocol version. + pub protocol_version: u32, + /// Network id of this peer. + pub network_id: NetworkId, + /// Total difficulty of the head of the chain. + pub head_td: U256, + /// Hash of the best block. + pub head_hash: H256, + /// Number of the best block. + pub head_num: u64, + /// Genesis hash + pub genesis_hash: H256, + /// Last announced chain head and reorg depth to common ancestor. + pub last_head: Option<(H256, u64)>, +} + +/// Peer capabilities. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Capabilities { + /// Whether this peer can serve headers + pub serve_headers: bool, + /// Earliest block number it can serve block/receipt requests for. + pub serve_chain_since: Option, + /// Earliest block number it can serve state requests for. + pub serve_state_since: Option, + /// Whether it can relay transactions to the eth network. + pub tx_relay: bool, +} + +impl Default for Capabilities { + fn default() -> Self { + Capabilities { + serve_headers: true, + serve_chain_since: None, + serve_state_since: None, + tx_relay: false, + } + } +} + +/// Attempt to parse a handshake message into its three parts: +/// - chain status +/// - serving capabilities +/// - buffer flow parameters +pub fn parse_handshake(rlp: UntrustedRlp) -> Result<(Status, Capabilities, FlowParams), DecoderError> { + let mut parser = Parser { + pos: 0, + rlp: rlp, + }; + + let status = Status { + protocol_version: try!(parser.expect(Key::ProtocolVersion)), + network_id: try!(parser.expect(Key::NetworkId) + .and_then(|id: u32| NetworkId::from_raw(id).ok_or(DecoderError::Custom("Invalid network ID")))), + head_td: try!(parser.expect(Key::HeadTD)), + head_hash: try!(parser.expect(Key::HeadHash)), + head_num: try!(parser.expect(Key::HeadNum)), + genesis_hash: try!(parser.expect(Key::GenesisHash)), + last_head: None, + }; + + let capabilities = Capabilities { + serve_headers: parser.expect_raw(Key::ServeHeaders).is_ok(), + serve_chain_since: parser.expect(Key::ServeChainSince).ok(), + serve_state_since: parser.expect(Key::ServeStateSince).ok(), + tx_relay: parser.expect_raw(Key::TxRelay).is_ok(), + }; + + let flow_params = FlowParams::new( + try!(parser.expect(Key::BufferLimit)), + try!(parser.expect(Key::BufferCostTable)), + try!(parser.expect(Key::BufferRechargeRate)), + ); + + Ok((status, capabilities, flow_params)) +} + +/// Write a handshake, given status, capabilities, and flow parameters. +pub fn write_handshake(status: &Status, capabilities: &Capabilities, flow_params: &FlowParams) -> Vec { + let mut pairs = Vec::new(); + pairs.push(encode_pair(Key::ProtocolVersion, &status.protocol_version)); + pairs.push(encode_pair(Key::NetworkId, &(status.network_id as u32))); + pairs.push(encode_pair(Key::HeadTD, &status.head_td)); + pairs.push(encode_pair(Key::HeadHash, &status.head_hash)); + pairs.push(encode_pair(Key::HeadNum, &status.head_num)); + pairs.push(encode_pair(Key::GenesisHash, &status.genesis_hash)); + + if capabilities.serve_headers { + pairs.push(encode_flag(Key::ServeHeaders)); + } + if let Some(ref serve_chain_since) = capabilities.serve_chain_since { + pairs.push(encode_pair(Key::ServeChainSince, serve_chain_since)); + } + if let Some(ref serve_state_since) = capabilities.serve_state_since { + pairs.push(encode_pair(Key::ServeStateSince, serve_state_since)); + } + if capabilities.tx_relay { + pairs.push(encode_flag(Key::TxRelay)); + } + + pairs.push(encode_pair(Key::BufferLimit, flow_params.limit())); + pairs.push(encode_pair(Key::BufferCostTable, flow_params.cost_table())); + pairs.push(encode_pair(Key::BufferRechargeRate, flow_params.recharge_rate())); + + let mut stream = RlpStream::new_list(pairs.len()); + + for pair in pairs { + stream.append_raw(&pair, 1); + } + + stream.out() +} + +/// An announcement of new chain head or capabilities made by a peer. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Announcement { + /// Hash of the best block. + pub head_hash: H256, + /// Number of the best block. + pub head_num: u64, + /// Head total difficulty + pub head_td: U256, + /// reorg depth to common ancestor of last announced head. + pub reorg_depth: u64, + /// optional new header-serving capability. false means "no change" + pub serve_headers: bool, + /// optional new state-serving capability + pub serve_state_since: Option, + /// optional new chain-serving capability + pub serve_chain_since: Option, + /// optional new transaction-relay capability. false means "no change" + pub tx_relay: bool, + // TODO: changes in buffer flow? +} + +/// Parse an announcement. +pub fn parse_announcement(rlp: UntrustedRlp) -> Result { + let mut last_key = None; + + let mut announcement = Announcement { + head_hash: try!(rlp.val_at(0)), + head_num: try!(rlp.val_at(1)), + head_td: try!(rlp.val_at(2)), + reorg_depth: try!(rlp.val_at(3)), + serve_headers: false, + serve_state_since: None, + serve_chain_since: None, + tx_relay: false, + }; + + let mut parser = Parser { + pos: 4, + rlp: rlp, + }; + + while let Some((key, item)) = try!(parser.get_next()) { + if Some(key) <= last_key { return Err(DecoderError::Custom("Invalid announcement key ordering")) } + last_key = Some(key); + + match key { + Key::ServeHeaders => announcement.serve_headers = true, + Key::ServeStateSince => announcement.serve_state_since = Some(try!(item.as_val())), + Key::ServeChainSince => announcement.serve_chain_since = Some(try!(item.as_val())), + Key::TxRelay => announcement.tx_relay = true, + _ => return Err(DecoderError::Custom("Nonsensical key in announcement")), + } + } + + Ok(announcement) +} + +/// Write an announcement out. +pub fn write_announcement(announcement: &Announcement) -> Vec { + let mut pairs = Vec::new(); + if announcement.serve_headers { + pairs.push(encode_flag(Key::ServeHeaders)); + } + if let Some(ref serve_chain_since) = announcement.serve_chain_since { + pairs.push(encode_pair(Key::ServeChainSince, serve_chain_since)); + } + if let Some(ref serve_state_since) = announcement.serve_state_since { + pairs.push(encode_pair(Key::ServeStateSince, serve_state_since)); + } + if announcement.tx_relay { + pairs.push(encode_flag(Key::TxRelay)); + } + + let mut stream = RlpStream::new_list(4 + pairs.len()); + stream + .append(&announcement.head_hash) + .append(&announcement.head_num) + .append(&announcement.head_td) + .append(&announcement.reorg_depth); + + for item in pairs { + stream.append_raw(&item, 1); + } + + stream.out() +} + +#[cfg(test)] +mod tests { + use super::*; + use super::super::buffer_flow::FlowParams; + use util::{U256, H256, FixedHash}; + use rlp::{RlpStream, Stream ,UntrustedRlp, View}; + + #[test] + fn full_handshake() { + let status = Status { + protocol_version: 1, + network_id: NetworkId::Mainnet, + head_td: U256::default(), + head_hash: H256::default(), + head_num: 10, + genesis_hash: H256::zero(), + last_head: None, + }; + + let capabilities = Capabilities { + serve_headers: true, + serve_chain_since: Some(5), + serve_state_since: Some(8), + tx_relay: true, + }; + + let flow_params = FlowParams::new( + 1_000_000.into(), + Default::default(), + 1000.into(), + ); + + let handshake = write_handshake(&status, &capabilities, &flow_params); + + let (read_status, read_capabilities, read_flow) + = parse_handshake(UntrustedRlp::new(&handshake)).unwrap(); + + assert_eq!(read_status, status); + assert_eq!(read_capabilities, capabilities); + assert_eq!(read_flow, flow_params); + } + + #[test] + fn partial_handshake() { + let status = Status { + protocol_version: 1, + network_id: NetworkId::Mainnet, + head_td: U256::default(), + head_hash: H256::default(), + head_num: 10, + genesis_hash: H256::zero(), + last_head: None, + }; + + let capabilities = Capabilities { + serve_headers: false, + serve_chain_since: Some(5), + serve_state_since: None, + tx_relay: true, + }; + + let flow_params = FlowParams::new( + 1_000_000.into(), + Default::default(), + 1000.into(), + ); + + let handshake = write_handshake(&status, &capabilities, &flow_params); + + let (read_status, read_capabilities, read_flow) + = parse_handshake(UntrustedRlp::new(&handshake)).unwrap(); + + assert_eq!(read_status, status); + assert_eq!(read_capabilities, capabilities); + assert_eq!(read_flow, flow_params); + } + + #[test] + fn skip_unknown_keys() { + let status = Status { + protocol_version: 1, + network_id: NetworkId::Mainnet, + head_td: U256::default(), + head_hash: H256::default(), + head_num: 10, + genesis_hash: H256::zero(), + last_head: None, + }; + + let capabilities = Capabilities { + serve_headers: false, + serve_chain_since: Some(5), + serve_state_since: None, + tx_relay: true, + }; + + let flow_params = FlowParams::new( + 1_000_000.into(), + Default::default(), + 1000.into(), + ); + + let handshake = write_handshake(&status, &capabilities, &flow_params); + let interleaved = { + let handshake = UntrustedRlp::new(&handshake); + let mut stream = RlpStream::new_list(handshake.item_count() * 3); + + for item in handshake.iter() { + stream.append_raw(item.as_raw(), 1); + let (mut s1, mut s2) = (RlpStream::new_list(2), RlpStream::new_list(2)); + s1.append(&"foo").append_empty_data(); + s2.append(&"bar").append_empty_data(); + stream.append_raw(&s1.out(), 1); + stream.append_raw(&s2.out(), 1); + } + + stream.out() + }; + + let (read_status, read_capabilities, read_flow) + = parse_handshake(UntrustedRlp::new(&interleaved)).unwrap(); + + assert_eq!(read_status, status); + assert_eq!(read_capabilities, capabilities); + assert_eq!(read_flow, flow_params); + } + + #[test] + fn announcement_roundtrip() { + let announcement = Announcement { + head_hash: H256::random(), + head_num: 100_000, + head_td: 1_000_000.into(), + reorg_depth: 4, + serve_headers: false, + serve_state_since: Some(99_000), + serve_chain_since: Some(1), + tx_relay: true, + }; + + let serialized = write_announcement(&announcement); + let read = parse_announcement(UntrustedRlp::new(&serialized)).unwrap(); + + assert_eq!(read, announcement); + } + + #[test] + fn keys_out_of_order() { + use super::{Key, encode_pair, encode_flag}; + + let mut stream = RlpStream::new_list(6); + stream + .append(&H256::zero()) + .append(&10u64) + .append(&100_000u64) + .append(&2u64) + .append_raw(&encode_pair(Key::ServeStateSince, &44u64), 1) + .append_raw(&encode_flag(Key::ServeHeaders), 1); + + let out = stream.drain(); + assert!(parse_announcement(UntrustedRlp::new(&out)).is_err()); + + let mut stream = RlpStream::new_list(6); + stream + .append(&H256::zero()) + .append(&10u64) + .append(&100_000u64) + .append(&2u64) + .append_raw(&encode_flag(Key::ServeHeaders), 1) + .append_raw(&encode_pair(Key::ServeStateSince, &44u64), 1); + + let out = stream.drain(); + assert!(parse_announcement(UntrustedRlp::new(&out)).is_ok()); + } +} \ No newline at end of file diff --git a/ethcore/light/src/provider.rs b/ethcore/light/src/provider.rs new file mode 100644 index 000000000..b1625f95f --- /dev/null +++ b/ethcore/light/src/provider.rs @@ -0,0 +1,71 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! A provider for the LES protocol. This is typically a full node, who can +//! give as much data as necessary to its peers. + +use ethcore::transaction::SignedTransaction; +use ethcore::blockchain_info::BlockChainInfo; +use util::{Bytes, H256}; + +use request; + +/// Defines the operations that a provider for `LES` must fulfill. +/// +/// These are defined at [1], but may be subject to change. +/// Requests which can't be fulfilled should return an empty RLP list. +/// +/// [1]: https://github.com/ethcore/parity/wiki/Light-Ethereum-Subprotocol-(LES) +pub trait Provider: Send + Sync { + /// Provide current blockchain info. + fn chain_info(&self) -> BlockChainInfo; + + /// Find the depth of a common ancestor between two blocks. + fn reorg_depth(&self, a: &H256, b: &H256) -> Option; + + /// Earliest state. + fn earliest_state(&self) -> Option; + + /// Provide a list of headers starting at the requested block, + /// possibly in reverse and skipping `skip` at a time. + /// + /// The returned vector may have any length in the range [0, `max`], but the + /// results within must adhere to the `skip` and `reverse` parameters. + fn block_headers(&self, req: request::Headers) -> Vec; + + /// Provide as many as possible of the requested blocks (minus the headers) encoded + /// in RLP format. + fn block_bodies(&self, req: request::Bodies) -> Vec; + + /// Provide the receipts as many as possible of the requested blocks. + /// Returns a vector of RLP-encoded lists of receipts. + fn receipts(&self, req: request::Receipts) -> Vec; + + /// Provide a set of merkle proofs, as requested. Each request is a + /// block hash and request parameters. + /// + /// Returns a vector to RLP-encoded lists satisfying the requests. + fn proofs(&self, req: request::StateProofs) -> Vec; + + /// Provide contract code for the specified (block_hash, account_hash) pairs. + fn code(&self, req: request::ContractCodes) -> Vec; + + /// Provide header proofs from the Canonical Hash Tries. + fn header_proofs(&self, req: request::HeaderProofs) -> Vec; + + /// Provide pending transactions. + fn pending_transactions(&self) -> Vec; +} \ No newline at end of file diff --git a/ethcore/light/src/request.rs b/ethcore/light/src/request.rs new file mode 100644 index 000000000..f043f0f25 --- /dev/null +++ b/ethcore/light/src/request.rs @@ -0,0 +1,145 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! LES request types. + +// TODO: make IPC compatible. + +use util::H256; + +/// A request for block headers. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Headers { + /// Block information for the request being made. + pub block: (u64, H256), + /// The maximum amount of headers which can be returned. + pub max: usize, + /// The amount of headers to skip between each response entry. + pub skip: usize, + /// Whether the headers should proceed in falling number from the initial block. + pub reverse: bool, +} + +/// A request for specific block bodies. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Bodies { + /// Hashes which bodies are being requested for. + pub block_hashes: Vec +} + +/// A request for transaction receipts. +/// +/// This request is answered with a list of transaction receipts for each block +/// requested. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Receipts { + /// Block hashes to return receipts for. + pub block_hashes: Vec, +} + +/// A request for a state proof +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct StateProof { + /// Block hash to query state from. + pub block: H256, + /// Key of the state trie -- corresponds to account hash. + pub key1: H256, + /// Key in that account's storage trie; if empty, then the account RLP should be + /// returned. + pub key2: Option, + /// if greater than zero, trie nodes beyond this level may be omitted. + pub from_level: u32, // could even safely be u8; trie w/ 32-byte key can be at most 64-levels deep. +} + +/// A request for state proofs. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct StateProofs { + /// All the proof requests. + pub requests: Vec, +} + +/// A request for contract code. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ContractCodes { + /// Block hash and account key (== sha3(address)) pairs to fetch code for. + pub code_requests: Vec<(H256, H256)>, +} + +/// A request for a header proof from the Canonical Hash Trie. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct HeaderProof { + /// Number of the CHT. + pub cht_number: u64, + /// Block number requested. + pub block_number: u64, + /// If greater than zero, trie nodes beyond this level may be omitted. + pub from_level: u32, +} + +/// A request for header proofs from the CHT. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct HeaderProofs { + /// All the proof requests. + pub requests: Vec, +} + +/// Kinds of requests. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Kind { + /// Requesting headers. + Headers, + /// Requesting block bodies. + Bodies, + /// Requesting transaction receipts. + Receipts, + /// Requesting proofs of state trie nodes. + StateProofs, + /// Requesting contract code by hash. + Codes, + /// Requesting header proofs (from the CHT). + HeaderProofs, +} + +/// Encompasses all possible types of requests in a single structure. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Request { + /// Requesting headers. + Headers(Headers), + /// Requesting block bodies. + Bodies(Bodies), + /// Requesting transaction receipts. + Receipts(Receipts), + /// Requesting state proofs. + StateProofs(StateProofs), + /// Requesting contract codes. + Codes(ContractCodes), + /// Requesting header proofs. + HeaderProofs(HeaderProofs), +} + +impl Request { + /// Get the kind of request this is. + pub fn kind(&self) -> Kind { + match *self { + Request::Headers(_) => Kind::Headers, + Request::Bodies(_) => Kind::Bodies, + Request::Receipts(_) => Kind::Receipts, + Request::StateProofs(_) => Kind::StateProofs, + Request::Codes(_) => Kind::Codes, + Request::HeaderProofs(_) => Kind::HeaderProofs, + } + } +} \ No newline at end of file diff --git a/ethcore/src/lib.rs b/ethcore/src/lib.rs index c7f40418c..bf3e59171 100644 --- a/ethcore/src/lib.rs +++ b/ethcore/src/lib.rs @@ -137,6 +137,7 @@ pub mod miner; pub mod snapshot; pub mod action_params; pub mod db; +pub mod verification; #[macro_use] pub mod evm; mod cache_manager; @@ -150,7 +151,6 @@ mod account_db; mod builtin; mod executive; mod externalities; -mod verification; mod blockchain; mod types; mod factory; diff --git a/ethcore/src/types/mod.rs.in b/ethcore/src/types/mod.rs.in index 1d2cdb3c0..6ef67009a 100644 --- a/ethcore/src/types/mod.rs.in +++ b/ethcore/src/types/mod.rs.in @@ -33,4 +33,4 @@ pub mod transaction_import; pub mod block_import_error; pub mod restoration_status; pub mod snapshot_manifest; -pub mod mode; \ No newline at end of file +pub mod mode; diff --git a/ethcore/src/verification/canon_verifier.rs b/ethcore/src/verification/canon_verifier.rs index cc6bc448a..b5b01279e 100644 --- a/ethcore/src/verification/canon_verifier.rs +++ b/ethcore/src/verification/canon_verifier.rs @@ -14,6 +14,8 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . +//! Canonical verifier. + use blockchain::BlockProvider; use engines::Engine; use error::Error; @@ -21,6 +23,7 @@ use header::Header; use super::Verifier; use super::verification; +/// A canonial verifier -- this does full verification. pub struct CanonVerifier; impl Verifier for CanonVerifier { diff --git a/ethcore/src/verification/mod.rs b/ethcore/src/verification/mod.rs index 239c88597..55663052b 100644 --- a/ethcore/src/verification/mod.rs +++ b/ethcore/src/verification/mod.rs @@ -14,6 +14,8 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . +//! Block verification utilities. + pub mod verification; pub mod verifier; pub mod queue; @@ -44,6 +46,7 @@ impl Default for VerifierType { } } +/// Create a new verifier based on type. pub fn new(v: VerifierType) -> Box { match v { VerifierType::Canon | VerifierType::CanonNoSeal => Box::new(CanonVerifier), diff --git a/ethcore/src/verification/noop_verifier.rs b/ethcore/src/verification/noop_verifier.rs index fb798be46..7db688a85 100644 --- a/ethcore/src/verification/noop_verifier.rs +++ b/ethcore/src/verification/noop_verifier.rs @@ -14,12 +14,15 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . +//! No-op verifier. + use blockchain::BlockProvider; use engines::Engine; use error::Error; use header::Header; use super::Verifier; +/// A no-op verifier -- this will verify everything it's given immediately. #[allow(dead_code)] pub struct NoopVerifier; diff --git a/ethcore/src/verification/verification.rs b/ethcore/src/verification/verification.rs index bb9f042ae..47b2e16de 100644 --- a/ethcore/src/verification/verification.rs +++ b/ethcore/src/verification/verification.rs @@ -14,12 +14,12 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -/// Block and transaction verification functions -/// -/// Block verification is done in 3 steps -/// 1. Quick verification upon adding to the block queue -/// 2. Signatures verification done in the queue. -/// 3. Final verification against the blockchain done before enactment. +//! Block and transaction verification functions +//! +//! Block verification is done in 3 steps +//! 1. Quick verification upon adding to the block queue +//! 2. Signatures verification done in the queue. +//! 3. Final verification against the blockchain done before enactment. use util::*; use engines::Engine; diff --git a/ethcore/src/verification/verifier.rs b/ethcore/src/verification/verifier.rs index 7f57407f7..05d488f95 100644 --- a/ethcore/src/verification/verifier.rs +++ b/ethcore/src/verification/verifier.rs @@ -14,6 +14,8 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . +//! A generic verifier trait. + use blockchain::BlockProvider; use engines::Engine; use error::Error; @@ -21,6 +23,8 @@ use header::Header; /// Should be used to verify blocks. pub trait Verifier: Send + Sync { + /// Verify a block relative to its parent and uncles. fn verify_block_family(&self, header: &Header, bytes: &[u8], engine: &Engine, bc: &BlockProvider) -> Result<(), Error>; + /// Do a final verification check for an enacted header vs its expected counterpart. fn verify_block_final(&self, expected: &Header, got: &Header) -> Result<(), Error>; } diff --git a/sync/src/sync_io.rs b/sync/src/sync_io.rs index c78074aed..8dc8c65c0 100644 --- a/sync/src/sync_io.rs +++ b/sync/src/sync_io.rs @@ -22,7 +22,7 @@ use ethcore::header::BlockNumber; use ethcore::snapshot::SnapshotService; use parking_lot::RwLock; -/// IO interface for the syning handler. +/// IO interface for the syncing handler. /// Provides peer connection management and an interface to the blockchain client. // TODO: ratings pub trait SyncIo { @@ -70,7 +70,7 @@ pub struct NetSyncIo<'s, 'h> where 'h: 's { impl<'s, 'h> NetSyncIo<'s, 'h> { /// Creates a new instance from the `NetworkContext` and the blockchain client reference. - pub fn new(network: &'s NetworkContext<'h>, + pub fn new(network: &'s NetworkContext<'h>, chain: &'s BlockChainClient, snapshot_service: &'s SnapshotService, chain_overlay: &'s RwLock>) -> NetSyncIo<'s, 'h> { diff --git a/util/fetch/src/lib.rs b/util/fetch/src/lib.rs index 8ec9e0ddd..7ab38604b 100644 --- a/util/fetch/src/lib.rs +++ b/util/fetch/src/lib.rs @@ -26,4 +26,4 @@ extern crate rand; pub mod client; pub mod fetch_file; -pub use self::client::{Client, Fetch, FetchError, FetchResult}; +pub use self::client::{Client, Fetch, FetchError, FetchResult}; \ No newline at end of file