From 6724f574d639fb9df6b9dd9c464a8a5aecd9bdbe Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Sun, 11 Dec 2016 15:40:31 +0100 Subject: [PATCH] Light server improvements and protocol adjustments (#3801) * light: basic transaction pool * light: network timeouts * fix dead code warnings * les: update to new message format * fix indentation * les: hash or number in headers req, not both --- ethcore/light/src/client.rs | 19 +- ethcore/light/src/lib.rs | 3 +- ethcore/light/src/net/context.rs | 122 +++++------ ethcore/light/src/net/mod.rs | 277 +++++++++++++++---------- ethcore/light/src/net/tests/mod.rs | 53 +++-- ethcore/light/src/provider.rs | 28 ++- ethcore/light/src/types/les_request.rs | 29 ++- 7 files changed, 312 insertions(+), 219 deletions(-) diff --git a/ethcore/light/src/client.rs b/ethcore/light/src/client.rs index 0035406dc..fcfff81e6 100644 --- a/ethcore/light/src/client.rs +++ b/ethcore/light/src/client.rs @@ -14,8 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -//! Light client implementation. Used for raw data queries as well as the header -//! sync. +//! Light client implementation. Stores data from light sync use std::sync::Arc; @@ -29,7 +28,7 @@ use ethcore::transaction::SignedTransaction; use ethcore::blockchain_info::BlockChainInfo; use io::IoChannel; -use util::hash::H256; +use util::hash::{H256, H256FastMap}; use util::{Bytes, Mutex}; use provider::Provider; @@ -37,9 +36,10 @@ use request; /// Light client implementation. pub struct Client { - engine: Arc, + _engine: Arc, header_queue: HeaderQueue, - message_channel: Mutex>, + _message_channel: Mutex>, + tx_pool: Mutex>, } impl Client { @@ -55,12 +55,17 @@ impl Client { false } + /// Import a local transaction. + pub fn import_own_transaction(&self, tx: SignedTransaction) { + self.tx_pool.lock().insert(tx.hash(), tx); + } + /// Fetch a vector of all pending transactions. pub fn pending_transactions(&self) -> Vec { - vec![] + self.tx_pool.lock().values().cloned().collect() } - /// Inquire about the status of a given block. + /// Inquire about the status of a given block (or header). pub fn status(&self, _id: BlockId) -> BlockStatus { BlockStatus::Unknown } diff --git a/ethcore/light/src/lib.rs b/ethcore/light/src/lib.rs index 7fa2f5911..f9008ac7c 100644 --- a/ethcore/light/src/lib.rs +++ b/ethcore/light/src/lib.rs @@ -28,8 +28,7 @@ //! It starts by performing a header-only sync, verifying random samples //! of members of the chain to varying degrees. -// TODO: remove when integrating with the rest of parity. -#![allow(dead_code)] +#![deny(missing_docs)] pub mod client; pub mod net; diff --git a/ethcore/light/src/net/context.rs b/ethcore/light/src/net/context.rs index c05e69b0f..96c217895 100644 --- a/ethcore/light/src/net/context.rs +++ b/ethcore/light/src/net/context.rs @@ -26,95 +26,95 @@ use request::Request; /// disconnecting peers. This is used as a generalization of the portions /// of a p2p network which the light protocol structure makes use of. pub trait IoContext { - /// Send a packet to a specific peer. - fn send(&self, peer: PeerId, packet_id: u8, packet_body: Vec); + /// Send a packet to a specific peer. + fn send(&self, peer: PeerId, packet_id: u8, packet_body: Vec); - /// Respond to a peer's message. Only works if this context is a byproduct - /// of a packet handler. - fn respond(&self, packet_id: u8, packet_body: Vec); + /// Respond to a peer's message. Only works if this context is a byproduct + /// of a packet handler. + fn respond(&self, packet_id: u8, packet_body: Vec); - /// Disconnect a peer. - fn disconnect_peer(&self, peer: PeerId); + /// Disconnect a peer. + fn disconnect_peer(&self, peer: PeerId); - /// Disable a peer -- this is a disconnect + a time-out. - fn disable_peer(&self, peer: PeerId); + /// Disable a peer -- this is a disconnect + a time-out. + fn disable_peer(&self, peer: PeerId); - /// Get a peer's protocol version. - fn protocol_version(&self, peer: PeerId) -> Option; + /// Get a peer's protocol version. + fn protocol_version(&self, peer: PeerId) -> Option; } impl<'a> IoContext for NetworkContext<'a> { - fn send(&self, peer: PeerId, packet_id: u8, packet_body: Vec) { - if let Err(e) = self.send(peer, packet_id, packet_body) { - debug!(target: "les", "Error sending packet to peer {}: {}", peer, e); - } - } + fn send(&self, peer: PeerId, packet_id: u8, packet_body: Vec) { + if let Err(e) = self.send(peer, packet_id, packet_body) { + debug!(target: "les", "Error sending packet to peer {}: {}", peer, e); + } + } - fn respond(&self, packet_id: u8, packet_body: Vec) { - if let Err(e) = self.respond(packet_id, packet_body) { - debug!(target: "les", "Error responding to peer message: {}", e); - } - } + fn respond(&self, packet_id: u8, packet_body: Vec) { + if let Err(e) = self.respond(packet_id, packet_body) { + debug!(target: "les", "Error responding to peer message: {}", e); + } + } - fn disconnect_peer(&self, peer: PeerId) { - NetworkContext::disconnect_peer(self, peer); - } + fn disconnect_peer(&self, peer: PeerId) { + NetworkContext::disconnect_peer(self, peer); + } - fn disable_peer(&self, peer: PeerId) { - NetworkContext::disable_peer(self, peer); - } + fn disable_peer(&self, peer: PeerId) { + NetworkContext::disable_peer(self, peer); + } - fn protocol_version(&self, peer: PeerId) -> Option { - self.protocol_version(self.subprotocol_name(), peer) - } + fn protocol_version(&self, peer: PeerId) -> Option { + self.protocol_version(self.subprotocol_name(), peer) + } } /// Context for a protocol event. pub trait EventContext { - /// Get the peer relevant to the event e.g. message sender, - /// disconnected/connected peer. - fn peer(&self) -> PeerId; + /// Get the peer relevant to the event e.g. message sender, + /// disconnected/connected peer. + fn peer(&self) -> PeerId; - /// Make a request from a peer. - fn request_from(&self, peer: PeerId, request: Request) -> Result; + /// Make a request from a peer. + fn request_from(&self, peer: PeerId, request: Request) -> Result; - /// Make an announcement of new capabilities to the rest of the peers. - // TODO: maybe just put this on a timer in LightProtocol? - fn make_announcement(&self, announcement: Announcement); + /// Make an announcement of new capabilities to the rest of the peers. + // TODO: maybe just put this on a timer in LightProtocol? + fn make_announcement(&self, announcement: Announcement); - /// Disconnect a peer. - fn disconnect_peer(&self, peer: PeerId); + /// Disconnect a peer. + fn disconnect_peer(&self, peer: PeerId); - /// Disable a peer. - fn disable_peer(&self, peer: PeerId); + /// Disable a peer. + fn disable_peer(&self, peer: PeerId); } /// Concrete implementation of `EventContext` over the light protocol struct and /// an io context. pub struct Ctx<'a> { - /// Io context to enable immediate response to events. - pub io: &'a IoContext, - /// Protocol implementation. - pub proto: &'a LightProtocol, - /// Relevant peer for event. - pub peer: PeerId, + /// Io context to enable immediate response to events. + pub io: &'a IoContext, + /// Protocol implementation. + pub proto: &'a LightProtocol, + /// Relevant peer for event. + pub peer: PeerId, } impl<'a> EventContext for Ctx<'a> { - fn peer(&self) -> PeerId { self.peer } - fn request_from(&self, peer: PeerId, request: Request) -> Result { - self.proto.request_from(self.io, &peer, request) - } + fn peer(&self) -> PeerId { self.peer } + fn request_from(&self, peer: PeerId, request: Request) -> Result { + self.proto.request_from(self.io, &peer, request) + } - fn make_announcement(&self, announcement: Announcement) { - self.proto.make_announcement(self.io, announcement); - } + fn make_announcement(&self, announcement: Announcement) { + self.proto.make_announcement(self.io, announcement); + } - fn disconnect_peer(&self, peer: PeerId) { - self.io.disconnect_peer(peer); - } + fn disconnect_peer(&self, peer: PeerId) { + self.io.disconnect_peer(peer); + } - fn disable_peer(&self, peer: PeerId) { - self.io.disable_peer(peer); - } + fn disable_peer(&self, peer: PeerId) { + self.io.disable_peer(peer); + } } \ No newline at end of file diff --git a/ethcore/light/src/net/mod.rs b/ethcore/light/src/net/mod.rs index 481740a48..e5bf0cb2b 100644 --- a/ethcore/light/src/net/mod.rs +++ b/ethcore/light/src/net/mod.rs @@ -34,7 +34,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; use provider::Provider; -use request::{self, Request}; +use request::{self, HashOrNumber, Request}; use self::buffer_flow::{Buffer, FlowParams}; use self::context::Ctx; @@ -57,13 +57,13 @@ const TIMEOUT_INTERVAL_MS: u64 = 1000; // minimum interval between updates. const UPDATE_INTERVAL_MS: i64 = 5000; -// Supported protocol versions. +/// Supported protocol versions. pub const PROTOCOL_VERSIONS: &'static [u8] = &[1]; -// Max protocol version. +/// Max protocol version. pub const MAX_PROTOCOL_VERSION: u8 = 1; -// Packet count for LES. +/// Packet count for LES. pub const PACKET_COUNT: u8 = 15; // packet ID definitions. @@ -102,6 +102,18 @@ mod packet { pub const HEADER_PROOFS: u8 = 0x0e; } +// timeouts for different kinds of requests. all values are in milliseconds. +// TODO: variable timeouts based on request count. +mod timeout { + pub const HANDSHAKE: i64 = 2500; + pub const HEADERS: i64 = 5000; + pub const BODIES: i64 = 5000; + pub const RECEIPTS: i64 = 3500; + pub const PROOFS: i64 = 4000; + pub const CONTRACT_CODES: i64 = 5000; + pub const HEADER_PROOFS: i64 = 3500; +} + /// A request id. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct ReqId(usize); @@ -111,7 +123,6 @@ pub struct ReqId(usize); struct PendingPeer { sent_head: H256, last_update: SteadyTime, - proto_version: u8, } // data about each peer. @@ -122,7 +133,6 @@ struct Peer { remote_flow: Option<(Buffer, FlowParams)>, sent_head: H256, // last head we've given them. last_update: SteadyTime, - proto_version: u8, } impl Peer { @@ -443,17 +453,54 @@ impl LightProtocol { } }; - // if something went wrong, figure out how much to punish the peer. if let Err(e) = res { - match e.punishment() { - Punishment::None => {} - Punishment::Disconnect => { - debug!(target: "les", "Disconnecting peer {}: {}", peer, e); - io.disconnect_peer(*peer) - } - Punishment::Disable => { - debug!(target: "les", "Disabling peer {}: {}", peer, e); - io.disable_peer(*peer) + punish(*peer, io, e); + } + } + + // check timeouts and punish peers. + fn timeout_check(&self, io: &IoContext) { + let now = SteadyTime::now(); + + // handshake timeout + { + let mut pending = self.pending_peers.write(); + let slowpokes: Vec<_> = pending.iter() + .filter(|&(_, ref peer)| { + peer.last_update + Duration::milliseconds(timeout::HANDSHAKE) <= now + }) + .map(|(&p, _)| p) + .collect(); + + for slowpoke in slowpokes { + debug!(target: "les", "Peer {} handshake timed out", slowpoke); + pending.remove(&slowpoke); + io.disconnect_peer(slowpoke); + } + } + + // request timeouts + { + for r in self.pending_requests.read().values() { + let kind_timeout = match r.request.kind() { + request::Kind::Headers => timeout::HEADERS, + request::Kind::Bodies => timeout::BODIES, + request::Kind::Receipts => timeout::RECEIPTS, + request::Kind::StateProofs => timeout::PROOFS, + request::Kind::Codes => timeout::CONTRACT_CODES, + request::Kind::HeaderProofs => timeout::HEADER_PROOFS, + }; + + if r.timestamp + Duration::milliseconds(kind_timeout) <= now { + debug!(target: "les", "Request for {:?} from peer {} timed out", + r.request.kind(), r.peer_id); + + // keep the request in the `pending` set for now so + // on_disconnect will pass unfulfilled ReqIds to handlers. + // in the case that a response is received after this, the + // disconnect won't be cancelled but the ReqId won't be + // marked as abandoned. + io.disconnect_peer(r.peer_id); } } } @@ -463,19 +510,37 @@ impl LightProtocol { impl LightProtocol { // called when a peer connects. fn on_connect(&self, peer: &PeerId, io: &IoContext) { - let peer = *peer; + let proto_version = match io.protocol_version(*peer).ok_or(Error::WrongNetwork) { + Ok(pv) => pv, + Err(e) => { punish(*peer, io, e); return } + }; - trace!(target: "les", "Peer {} connecting", peer); - - match self.send_status(peer, io) { - Ok(pending_peer) => { - self.pending_peers.write().insert(peer, pending_peer); - } - Err(e) => { - trace!(target: "les", "Error while sending status: {}", e); - io.disconnect_peer(peer); - } + if PROTOCOL_VERSIONS.iter().find(|x| **x == proto_version).is_none() { + punish(*peer, io, Error::UnsupportedProtocolVersion(proto_version)); + return; } + + let chain_info = self.provider.chain_info(); + + let status = Status { + head_td: chain_info.total_difficulty, + head_hash: chain_info.best_block_hash, + head_num: chain_info.best_block_number, + genesis_hash: chain_info.genesis_hash, + protocol_version: proto_version as u32, // match peer proto version + network_id: self.network_id, + last_head: None, + }; + + let capabilities = self.capabilities.read().clone(); + let status_packet = status::write_handshake(&status, &capabilities, Some(&self.flow_params)); + + self.pending_peers.write().insert(*peer, PendingPeer { + sent_head: chain_info.best_block_hash, + last_update: SteadyTime::now(), + }); + + io.send(*peer, packet::STATUS, status_packet); } // called when a peer disconnects. @@ -508,38 +573,6 @@ impl LightProtocol { } } - // send status to a peer. - fn send_status(&self, peer: PeerId, io: &IoContext) -> Result { - let proto_version = try!(io.protocol_version(peer).ok_or(Error::WrongNetwork)); - - if PROTOCOL_VERSIONS.iter().find(|x| **x == proto_version).is_none() { - return Err(Error::UnsupportedProtocolVersion(proto_version)); - } - - let chain_info = self.provider.chain_info(); - - let status = Status { - head_td: chain_info.total_difficulty, - head_hash: chain_info.best_block_hash, - head_num: chain_info.best_block_number, - genesis_hash: chain_info.genesis_hash, - protocol_version: proto_version as u32, // match peer proto version - network_id: self.network_id, - last_head: None, - }; - - let capabilities = self.capabilities.read().clone(); - let status_packet = status::write_handshake(&status, &capabilities, Some(&self.flow_params)); - - io.send(peer, packet::STATUS, status_packet); - - Ok(PendingPeer { - sent_head: chain_info.best_block_hash, - last_update: SteadyTime::now(), - proto_version: proto_version, - }) - } - // Handle status message from peer. fn status(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> { let pending = match self.pending_peers.write().remove(peer) { @@ -570,7 +603,6 @@ impl LightProtocol { remote_flow: remote_flow, sent_head: pending.sent_head, last_update: pending.last_update, - proto_version: pending.proto_version, })); for handler in &self.handlers { @@ -630,7 +662,7 @@ impl LightProtocol { } // Handle a request for block headers. - fn get_block_headers(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> { + fn get_block_headers(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> { const MAX_HEADERS: usize = 512; let peers = self.peers.read(); @@ -645,18 +677,21 @@ impl LightProtocol { let mut peer = peer.lock(); let req_id: u64 = try!(data.val_at(0)); + let data = try!(data.at(1)); - let block = { - let rlp = try!(data.at(1)); - (try!(rlp.val_at(0)), try!(rlp.val_at(1))) + let start_block = { + if try!(data.at(0)).size() == 32 { + HashOrNumber::Hash(try!(data.val_at(0))) + } else { + HashOrNumber::Number(try!(data.val_at(0))) + } }; let req = request::Headers { - block_num: block.0, - block_hash: block.1, - max: ::std::cmp::min(MAX_HEADERS, try!(data.val_at(2))), - skip: try!(data.val_at(3)), - reverse: try!(data.val_at(4)), + start: start_block, + max: ::std::cmp::min(MAX_HEADERS, try!(data.val_at(1))), + skip: try!(data.val_at(2)), + reverse: try!(data.val_at(3)), }; let max_cost = try!(peer.deduct_max(&self.flow_params, request::Kind::Headers, req.max)); @@ -667,8 +702,8 @@ impl LightProtocol { let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost); io.respond(packet::BLOCK_HEADERS, { - let mut stream = RlpStream::new_list(response.len() + 2); - stream.append(&req_id).append(&cur_buffer); + let mut stream = RlpStream::new_list(3); + stream.append(&req_id).append(&cur_buffer).begin_list(response.len()); for header in response { stream.append_raw(&header, 1); @@ -683,7 +718,7 @@ impl LightProtocol { // Receive a response for block headers. fn block_headers(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> { let req_id = try!(self.pre_verify_response(peer, request::Kind::Headers, &raw)); - let raw_headers: Vec<_> = raw.iter().skip(2).map(|x| x.as_raw().to_owned()).collect(); + let raw_headers: Vec<_> = try!(raw.at(2)).iter().map(|x| x.as_raw().to_owned()).collect(); for handler in &self.handlers { handler.on_block_headers(&Ctx { @@ -713,7 +748,7 @@ impl LightProtocol { let req_id: u64 = try!(data.val_at(0)); let req = request::Bodies { - block_hashes: try!(data.iter().skip(1).take(MAX_BODIES).map(|x| x.as_val()).collect()) + block_hashes: try!(try!(data.at(1)).iter().take(MAX_BODIES).map(|x| x.as_val()).collect()) }; let max_cost = try!(peer.deduct_max(&self.flow_params, request::Kind::Bodies, req.block_hashes.len())); @@ -726,8 +761,8 @@ impl LightProtocol { let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost); io.respond(packet::BLOCK_BODIES, { - let mut stream = RlpStream::new_list(response.len() + 2); - stream.append(&req_id).append(&cur_buffer); + let mut stream = RlpStream::new_list(3); + stream.append(&req_id).append(&cur_buffer).begin_list(response.len()); for body in response { stream.append_raw(&body, 1); @@ -742,7 +777,7 @@ impl LightProtocol { // Receive a response for block bodies. fn block_bodies(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> { let req_id = try!(self.pre_verify_response(peer, request::Kind::Bodies, &raw)); - let raw_bodies: Vec = raw.iter().skip(2).map(|x| x.as_raw().to_owned()).collect(); + let raw_bodies: Vec = try!(raw.at(2)).iter().map(|x| x.as_raw().to_owned()).collect(); for handler in &self.handlers { handler.on_block_bodies(&Ctx { @@ -772,7 +807,7 @@ impl LightProtocol { let req_id: u64 = try!(data.val_at(0)); let req = request::Receipts { - block_hashes: try!(data.iter().skip(1).take(MAX_RECEIPTS).map(|x| x.as_val()).collect()) + block_hashes: try!(try!(data.at(1)).iter().take(MAX_RECEIPTS).map(|x| x.as_val()).collect()) }; let max_cost = try!(peer.deduct_max(&self.flow_params, request::Kind::Receipts, req.block_hashes.len())); @@ -785,8 +820,8 @@ impl LightProtocol { let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost); io.respond(packet::RECEIPTS, { - let mut stream = RlpStream::new_list(response.len() + 2); - stream.append(&req_id).append(&cur_buffer); + let mut stream = RlpStream::new_list(3); + stream.append(&req_id).append(&cur_buffer).begin_list(response.len()); for receipts in response { stream.append_raw(&receipts, 1); @@ -801,9 +836,8 @@ impl LightProtocol { // Receive a response for receipts. fn receipts(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> { let req_id = try!(self.pre_verify_response(peer, request::Kind::Receipts, &raw)); - let raw_receipts: Vec> = try!(raw + let raw_receipts: Vec> = try!(try!(raw.at(2)) .iter() - .skip(2) .map(|x| x.as_val()) .collect()); @@ -835,7 +869,7 @@ impl LightProtocol { let req_id: u64 = try!(data.val_at(0)); let req = { - let requests: Result, Error> = data.iter().skip(1).take(MAX_PROOFS).map(|x| { + let requests: Result, Error> = try!(data.at(1)).iter().take(MAX_PROOFS).map(|x| { Ok(request::StateProof { block: try!(x.val_at(0)), key1: try!(x.val_at(1)), @@ -859,8 +893,8 @@ impl LightProtocol { let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost); io.respond(packet::PROOFS, { - let mut stream = RlpStream::new_list(response.len() + 2); - stream.append(&req_id).append(&cur_buffer); + let mut stream = RlpStream::new_list(3); + stream.append(&req_id).append(&cur_buffer).begin_list(response.len()); for proof in response { stream.append_raw(&proof, 1); @@ -876,8 +910,7 @@ impl LightProtocol { fn proofs(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> { let req_id = try!(self.pre_verify_response(peer, request::Kind::StateProofs, &raw)); - let raw_proofs: Vec> = raw.iter() - .skip(2) + let raw_proofs: Vec> = try!(raw.at(2)).iter() .map(|x| x.iter().map(|node| node.as_raw().to_owned()).collect()) .collect(); @@ -909,7 +942,7 @@ impl LightProtocol { let req_id: u64 = try!(data.val_at(0)); let req = { - let requests: Result, Error> = data.iter().skip(1).take(MAX_CODES).map(|x| { + let requests: Result, Error> = try!(data.at(1)).iter().take(MAX_CODES).map(|x| { Ok(request::ContractCode { block_hash: try!(x.val_at(0)), account_key: try!(x.val_at(1)), @@ -931,8 +964,8 @@ impl LightProtocol { let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost); io.respond(packet::CONTRACT_CODES, { - let mut stream = RlpStream::new_list(response.len() + 2); - stream.append(&req_id).append(&cur_buffer); + let mut stream = RlpStream::new_list(3); + stream.append(&req_id).append(&cur_buffer).begin_list(response.len()); for code in response { stream.append(&code); @@ -948,7 +981,7 @@ impl LightProtocol { fn contract_code(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> { let req_id = try!(self.pre_verify_response(peer, request::Kind::Codes, &raw)); - let raw_code: Vec = try!(raw.iter().skip(2).map(|x| x.as_val()).collect()); + let raw_code: Vec = try!(try!(raw.at(2)).iter().map(|x| x.as_val()).collect()); for handler in &self.handlers { handler.on_code(&Ctx { @@ -978,7 +1011,7 @@ impl LightProtocol { let req_id: u64 = try!(data.val_at(0)); let req = { - let requests: Result, Error> = data.iter().skip(1).take(MAX_PROOFS).map(|x| { + let requests: Result, Error> = try!(data.at(1)).iter().take(MAX_PROOFS).map(|x| { Ok(request::HeaderProof { cht_number: try!(x.val_at(0)), block_number: try!(x.val_at(1)), @@ -1001,8 +1034,8 @@ impl LightProtocol { let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost); io.respond(packet::HEADER_PROOFS, { - let mut stream = RlpStream::new_list(response.len() + 2); - stream.append(&req_id).append(&cur_buffer); + let mut stream = RlpStream::new_list(3); + stream.append(&req_id).append(&cur_buffer).begin_list(response.len()); for proof in response { stream.append_raw(&proof, 1); @@ -1023,9 +1056,8 @@ impl LightProtocol { )) } - let req_id = try!(self.pre_verify_response(peer, request::Kind::HeaderProofs, &raw)); - let raw_proofs: Vec<_> = try!(raw.iter().skip(2).map(decode_res).collect()); + let raw_proofs: Vec<_> = try!(try!(raw.at(2)).iter().map(decode_res).collect()); for handler in &self.handlers { handler.on_header_proofs(&Ctx { @@ -1058,6 +1090,21 @@ impl LightProtocol { } } +// if something went wrong, figure out how much to punish the peer. +fn punish(peer: PeerId, io: &IoContext, e: Error) { + match e.punishment() { + Punishment::None => {} + Punishment::Disconnect => { + debug!(target: "les", "Disconnecting peer {}: {}", peer, e); + io.disconnect_peer(peer) + } + Punishment::Disable => { + debug!(target: "les", "Disabling peer {}: {}", peer, e); + io.disable_peer(peer) + } + } +} + impl NetworkProtocolHandler for LightProtocol { fn initialize(&self, io: &NetworkContext) { io.register_timer(TIMEOUT, TIMEOUT_INTERVAL_MS).expect("Error registering sync timer."); @@ -1075,11 +1122,9 @@ impl NetworkProtocolHandler for LightProtocol { self.on_disconnect(*peer, io); } - fn timeout(&self, _io: &NetworkContext, timer: TimerToken) { + fn timeout(&self, io: &NetworkContext, timer: TimerToken) { match timer { - TIMEOUT => { - // broadcast transactions to peers. - } + TIMEOUT => self.timeout_check(io), _ => warn!(target: "les", "received timeout on unknown token {}", timer), } } @@ -1089,20 +1134,24 @@ impl NetworkProtocolHandler for LightProtocol { fn encode_request(req: &Request, req_id: usize) -> Vec { match *req { Request::Headers(ref headers) => { - let mut stream = RlpStream::new_list(5); + let mut stream = RlpStream::new_list(2); + stream.append(&req_id).begin_list(4); + + match headers.start { + HashOrNumber::Hash(ref hash) => stream.append(hash), + HashOrNumber::Number(ref num) => stream.append(num), + }; + stream - .append(&req_id) - .begin_list(2) - .append(&headers.block_num) - .append(&headers.block_hash) .append(&headers.max) .append(&headers.skip) .append(&headers.reverse); + stream.out() } Request::Bodies(ref request) => { - let mut stream = RlpStream::new_list(request.block_hashes.len() + 1); - stream.append(&req_id); + let mut stream = RlpStream::new_list(2); + stream.append(&req_id).begin_list(request.block_hashes.len()); for hash in &request.block_hashes { stream.append(hash); @@ -1111,8 +1160,8 @@ fn encode_request(req: &Request, req_id: usize) -> Vec { stream.out() } Request::Receipts(ref request) => { - let mut stream = RlpStream::new_list(request.block_hashes.len() + 1); - stream.append(&req_id); + let mut stream = RlpStream::new_list(2); + stream.append(&req_id).begin_list(request.block_hashes.len()); for hash in &request.block_hashes { stream.append(hash); @@ -1121,8 +1170,8 @@ fn encode_request(req: &Request, req_id: usize) -> Vec { stream.out() } Request::StateProofs(ref request) => { - let mut stream = RlpStream::new_list(request.requests.len() + 1); - stream.append(&req_id); + let mut stream = RlpStream::new_list(2); + stream.append(&req_id).begin_list(request.requests.len()); for proof_req in &request.requests { stream.begin_list(4) @@ -1140,8 +1189,8 @@ fn encode_request(req: &Request, req_id: usize) -> Vec { stream.out() } Request::Codes(ref request) => { - let mut stream = RlpStream::new_list(request.code_requests.len() + 1); - stream.append(&req_id); + let mut stream = RlpStream::new_list(2); + stream.append(&req_id).begin_list(request.code_requests.len()); for code_req in &request.code_requests { stream.begin_list(2) @@ -1152,8 +1201,8 @@ fn encode_request(req: &Request, req_id: usize) -> Vec { stream.out() } Request::HeaderProofs(ref request) => { - let mut stream = RlpStream::new_list(request.requests.len() + 1); - stream.append(&req_id); + let mut stream = RlpStream::new_list(2); + stream.append(&req_id).begin_list(request.requests.len()); for proof_req in &request.requests { stream.begin_list(3) diff --git a/ethcore/light/src/net/tests/mod.rs b/ethcore/light/src/net/tests/mod.rs index 876432ce2..7c0928cdd 100644 --- a/ethcore/light/src/net/tests/mod.rs +++ b/ethcore/light/src/net/tests/mod.rs @@ -91,16 +91,27 @@ impl Provider for TestProvider { } fn block_headers(&self, req: request::Headers) -> Vec { - let best_num = self.0.client.chain_info().best_block_number; - let start_num = req.block_num; + use request::HashOrNumber; + use ethcore::views::HeaderView; - match self.0.client.block_hash(BlockId::Number(req.block_num)) { - Some(hash) if hash == req.block_hash => {} - _=> { - trace!(target: "les_provider", "unknown/non-canonical start block in header request: {:?}", (req.block_num, req.block_hash)); - return vec![] + let best_num = self.chain_info().best_block_number; + let start_num = match req.start { + HashOrNumber::Number(start_num) => start_num, + HashOrNumber::Hash(hash) => match self.0.client.block_header(BlockId::Hash(hash)) { + None => { + return Vec::new(); + } + Some(header) => { + let num = HeaderView::new(&header).number(); + if req.max == 1 || self.0.client.block_hash(BlockId::Number(num)) != Some(hash) { + // Non-canonical header or single header requested. + return vec![header]; + } + + num + } } - } + }; (0u64..req.max as u64) .map(|x: u64| x.saturating_mul(req.skip + 1)) @@ -250,8 +261,7 @@ fn buffer_overflow() { // 1000 requests is far too many for the default flow params. let request = encode_request(&Request::Headers(Headers { - block_num: 1, - block_hash: provider.client.chain_info().genesis_hash, + start: 1.into(), max: 1000, skip: 0, reverse: false, @@ -284,8 +294,7 @@ fn get_block_headers() { } let request = Headers { - block_num: 1, - block_hash: provider.client.block_hash(BlockId::Number(1)).unwrap(), + start: 1.into(), max: 10, skip: 0, reverse: false, @@ -299,9 +308,9 @@ fn get_block_headers() { let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::Headers, 10); - let mut response_stream = RlpStream::new_list(12); + let mut response_stream = RlpStream::new_list(3); - response_stream.append(&req_id).append(&new_buf); + response_stream.append(&req_id).append(&new_buf).begin_list(10); for header in headers { response_stream.append_raw(&header, 1); } @@ -346,9 +355,9 @@ fn get_block_bodies() { let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::Bodies, 10); - let mut response_stream = RlpStream::new_list(12); + let mut response_stream = RlpStream::new_list(3); - response_stream.append(&req_id).append(&new_buf); + response_stream.append(&req_id).append(&new_buf).begin_list(10); for body in bodies { response_stream.append_raw(&body, 1); } @@ -399,9 +408,9 @@ fn get_block_receipts() { let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::Receipts, receipts.len()); - let mut response_stream = RlpStream::new_list(2 + receipts.len()); + let mut response_stream = RlpStream::new_list(3); - response_stream.append(&req_id).append(&new_buf); + response_stream.append(&req_id).append(&new_buf).begin_list(receipts.len()); for block_receipts in receipts { response_stream.append_raw(&block_receipts, 1); } @@ -448,9 +457,9 @@ fn get_state_proofs() { let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::StateProofs, 2); - let mut response_stream = RlpStream::new_list(4); + let mut response_stream = RlpStream::new_list(3); - response_stream.append(&req_id).append(&new_buf); + response_stream.append(&req_id).append(&new_buf).begin_list(2); for proof in proofs { response_stream.append_raw(&proof, 1); } @@ -497,9 +506,9 @@ fn get_contract_code() { let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::Codes, 2); - let mut response_stream = RlpStream::new_list(4); + let mut response_stream = RlpStream::new_list(3); - response_stream.append(&req_id).append(&new_buf); + response_stream.append(&req_id).append(&new_buf).begin_list(2); for code in codes { response_stream.append(&code); } diff --git a/ethcore/light/src/provider.rs b/ethcore/light/src/provider.rs index ad8d8ea16..ed2f49f5d 100644 --- a/ethcore/light/src/provider.rs +++ b/ethcore/light/src/provider.rs @@ -97,17 +97,29 @@ impl Provider for T { } fn block_headers(&self, req: request::Headers) -> Vec { + use request::HashOrNumber; + use ethcore::views::HeaderView; + let best_num = self.chain_info().best_block_number; - let start_num = req.block_num; + let start_num = match req.start { + HashOrNumber::Number(start_num) => start_num, + HashOrNumber::Hash(hash) => match self.block_header(BlockId::Hash(hash)) { + None => { + trace!(target: "les_provider", "Unknown block hash {} requested", hash); + return Vec::new(); + } + Some(header) => { + let num = HeaderView::new(&header).number(); + if req.max == 1 || self.block_hash(BlockId::Number(num)) != Some(hash) { + // Non-canonical header or single header requested. + return vec![header]; + } - match self.block_hash(BlockId::Number(req.block_num)) { - Some(hash) if hash == req.block_hash => {} - _=> { - trace!(target: "les_provider", "unknown/non-canonical start block in header request: {:?}", (req.block_num, req.block_hash)); - return vec![] + num + } } - } - + }; + (0u64..req.max as u64) .map(|x: u64| x.saturating_mul(req.skip + 1)) .take_while(|x| if req.reverse { x < &start_num } else { best_num - start_num >= *x }) diff --git a/ethcore/light/src/types/les_request.rs b/ethcore/light/src/types/les_request.rs index 49bd2e9cc..2c7bfb380 100644 --- a/ethcore/light/src/types/les_request.rs +++ b/ethcore/light/src/types/les_request.rs @@ -18,15 +18,34 @@ use util::H256; +/// Either a hash or a number. +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "ipc", derive(Binary))] +pub enum HashOrNumber { + /// Block hash variant. + Hash(H256), + /// Block number variant. + Number(u64), +} + +impl From for HashOrNumber { + fn from(hash: H256) -> Self { + HashOrNumber::Hash(hash) + } +} + +impl From for HashOrNumber { + fn from(num: u64) -> Self { + HashOrNumber::Number(num) + } +} + /// A request for block headers. #[derive(Debug, Clone, PartialEq, Eq)] #[cfg_attr(feature = "ipc", derive(Binary))] pub struct Headers { - /// Starting block number - pub block_num: u64, - /// Starting block hash. This and number could be combined but IPC codegen is - /// not robust enough to support it. - pub block_hash: H256, + /// Starting block number or hash. + pub start: HashOrNumber, /// The maximum amount of headers which can be returned. pub max: usize, /// The amount of headers to skip between each response entry.