From 10d75b6de04599880a264c07af77fa5d63fa1556 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 7 Dec 2016 15:27:04 +0100 Subject: [PATCH] light: implement all response handlers --- ethcore/light/src/net/error.rs | 6 +- ethcore/light/src/net/mod.rs | 164 ++++++++++++++++++++++++++++----- ethcore/light/src/provider.rs | 5 +- 3 files changed, 151 insertions(+), 24 deletions(-) diff --git a/ethcore/light/src/net/error.rs b/ethcore/light/src/net/error.rs index 0855cdeb8..86dbd54ba 100644 --- a/ethcore/light/src/net/error.rs +++ b/ethcore/light/src/net/error.rs @@ -54,6 +54,8 @@ pub enum Error { WrongNetwork, /// Unknown peer. UnknownPeer, + /// Unsolicited response. + UnsolicitedResponse, } impl Error { @@ -67,6 +69,7 @@ impl Error { Error::UnexpectedHandshake => Punishment::Disconnect, Error::WrongNetwork => Punishment::Disable, Error::UnknownPeer => Punishment::Disconnect, + Error::UnsolicitedResponse => Punishment::Disable, } } } @@ -92,7 +95,8 @@ impl fmt::Display for Error { Error::UnrecognizedPacket(code) => write!(f, "Unrecognized packet: 0x{:x}", code), Error::UnexpectedHandshake => write!(f, "Unexpected handshake"), Error::WrongNetwork => write!(f, "Wrong network"), - Error::UnknownPeer => write!(f, "unknown peer"), + Error::UnknownPeer => write!(f, "Unknown peer"), + Error::UnsolicitedResponse => write!(f, "Peer provided unsolicited data"), } } } \ No newline at end of file diff --git a/ethcore/light/src/net/mod.rs b/ethcore/light/src/net/mod.rs index e729b4361..9f08c4ca6 100644 --- a/ethcore/light/src/net/mod.rs +++ b/ethcore/light/src/net/mod.rs @@ -29,7 +29,7 @@ use util::hash::H256; use util::{Bytes, Mutex, RwLock, U256}; use time::SteadyTime; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::sync::atomic::{AtomicUsize, Ordering}; use provider::Provider; @@ -103,7 +103,6 @@ struct PendingPeer { struct Peer { local_buffer: Buffer, // their buffer relative to us remote_buffer: Buffer, // our buffer relative to them - current_asking: HashSet, // pending request ids. status: Status, capabilities: Capabilities, remote_flow: FlowParams, @@ -137,6 +136,7 @@ impl Peer { } /// Context for a network event. +#[derive(Clone)] pub struct EventContext<'a> { /// Protocol implementation. pub proto: &'a LightProtocol, @@ -177,15 +177,16 @@ pub trait Handler: Send + Sync { fn on_state_proofs(&self, _ctx: EventContext, _req_id: ReqId, _proofs: &[Vec]) { } /// Called when a peer responds with contract code. fn on_code(&self, _ctx: EventContext, _req_id: ReqId, _codes: &[Bytes]) { } - /// Called when a peer responds with header proofs. Each proof is a series of trie - /// nodes is ascending order by distance from the root. - fn on_header_proofs(&self, _ctx: EventContext, _req_id: ReqId, _proofs: &[Vec]) { } + /// Called when a peer responds with header proofs. Each proof is a block header coupled + /// with a series of trie nodes is ascending order by distance from the root. + fn on_header_proofs(&self, _ctx: EventContext, _req_id: ReqId, _proofs: &[(Bytes, Vec)]) { } } -// a request and the time it was made. +// a request, the peer who it was made to, and the time it was made. struct Requested { request: Request, timestamp: SteadyTime, + peer_id: PeerId, } /// Protocol parameters. @@ -280,10 +281,10 @@ impl LightProtocol { try!(io.send(*peer_id, packet_id, packet_data)); - peer.current_asking.insert(req_id); self.pending_requests.write().insert(req_id, Requested { request: request, timestamp: SteadyTime::now(), + peer_id: *peer_id, }); Ok(ReqId(req_id)) @@ -331,6 +332,38 @@ impl LightProtocol { pub fn add_handler(&mut self, handler: Box) { self.handlers.push(handler); } + + // Does the common pre-verification of responses before the response itself + // is actually decoded: + // - check whether peer exists + // - check whether request was made + // - check whether request kinds match + fn pre_verify_response(&self, peer: &PeerId, kind: request::Kind, raw: &UntrustedRlp) -> Result { + let req_id: usize = try!(raw.val_at(0)); + let cur_buffer: U256 = try!(raw.val_at(1)); + + trace!(target: "les", "pre-verifying response from peer {}, kind={:?}", peer, kind); + + match self.pending_requests.write().remove(&req_id) { + None => return Err(Error::UnsolicitedResponse), + Some(requested) => { + if requested.peer_id != *peer || requested.request.kind() != kind { + return Err(Error::UnsolicitedResponse) + } + } + } + + let peers = self.peers.read(); + match peers.get(peer) { + Some(peer_info) => { + let mut peer_info = peer_info.lock(); + let actual_buffer = ::std::cmp::min(cur_buffer, *peer_info.remote_flow.limit()); + peer_info.remote_buffer.update_to(actual_buffer); + Ok(ReqId(req_id)) + } + None => Err(Error::UnknownPeer), // probably only occurs in a race of some kind. + } + } } impl LightProtocol { @@ -352,8 +385,13 @@ impl LightProtocol { // called when a peer disconnects. fn on_disconnect(&self, peer: PeerId, io: &NetworkContext) { self.pending_peers.write().remove(&peer); - if let Some(peer_info) = self.peers.write().remove(&peer) { - let unfulfilled: Vec<_> = peer_info.into_inner().current_asking.into_iter().map(ReqId).collect(); + if self.peers.write().remove(&peer).is_some() { + let unfulfilled: Vec<_> = self.pending_requests.read() + .iter() + .filter(|&(_, r)| r.peer_id == peer) + .map(|(&id, _)| ReqId(id)) + .collect(); + { let mut pending = self.pending_requests.write(); for &ReqId(ref inner) in &unfulfilled { @@ -417,7 +455,6 @@ impl LightProtocol { self.peers.write().insert(*peer, Mutex::new(Peer { local_buffer: self.flow_params.create_buffer(), remote_buffer: flow_params.create_buffer(), - current_asking: HashSet::new(), status: status.clone(), capabilities: capabilities.clone(), remote_flow: flow_params, @@ -530,8 +567,19 @@ impl LightProtocol { } // Receive a response for block headers. - fn block_headers(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { - unimplemented!() + fn block_headers(&self, peer: &PeerId, io: &NetworkContext, raw: UntrustedRlp) -> Result<(), Error> { + let req_id = try!(self.pre_verify_response(peer, request::Kind::Headers, &raw)); + let raw_headers: Vec<_> = raw.iter().skip(2).map(|x| x.as_raw().to_owned()).collect(); + + for handler in &self.handlers { + handler.on_block_headers(EventContext { + peer: *peer, + io: io, + proto: self, + }, req_id, &raw_headers); + } + + Ok(()) } // Handle a request for block bodies. @@ -576,8 +624,19 @@ impl LightProtocol { } // Receive a response for block bodies. - fn block_bodies(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { - unimplemented!() + fn block_bodies(&self, peer: &PeerId, io: &NetworkContext, raw: UntrustedRlp) -> Result<(), Error> { + let req_id = try!(self.pre_verify_response(peer, request::Kind::Bodies, &raw)); + let raw_bodies: Vec = raw.iter().skip(2).map(|x| x.as_raw().to_owned()).collect(); + + for handler in &self.handlers { + handler.on_block_bodies(EventContext { + peer: *peer, + io: io, + proto: self, + }, req_id, &raw_bodies); + } + + Ok(()) } // Handle a request for receipts. @@ -622,8 +681,23 @@ impl LightProtocol { } // Receive a response for receipts. - fn receipts(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { - unimplemented!() + fn receipts(&self, peer: &PeerId, io: &NetworkContext, raw: UntrustedRlp) -> Result<(), Error> { + let req_id = try!(self.pre_verify_response(peer, request::Kind::Receipts, &raw)); + let raw_receipts: Vec> = try!(raw + .iter() + .skip(2) + .map(|x| x.as_val()) + .collect()); + + for handler in &self.handlers { + handler.on_receipts(EventContext { + peer: *peer, + io: io, + proto: self, + }, req_id, &raw_receipts); + } + + Ok(()) } // Handle a request for proofs. @@ -679,8 +753,23 @@ impl LightProtocol { } // Receive a response for proofs. - fn proofs(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { - unimplemented!() + fn proofs(&self, peer: &PeerId, io: &NetworkContext, raw: UntrustedRlp) -> Result<(), Error> { + let req_id = try!(self.pre_verify_response(peer, request::Kind::StateProofs, &raw)); + + let raw_proofs: Vec> = raw.iter() + .skip(2) + .map(|x| x.iter().map(|node| node.as_raw().to_owned()).collect()) + .collect(); + + for handler in &self.handlers { + handler.on_state_proofs(EventContext { + peer: *peer, + io: io, + proto: self, + }, req_id, &raw_proofs); + } + + Ok(()) } // Handle a request for contract code. @@ -734,8 +823,20 @@ impl LightProtocol { } // Receive a response for contract code. - fn contract_code(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { - unimplemented!() + fn contract_code(&self, peer: &PeerId, io: &NetworkContext, raw: UntrustedRlp) -> Result<(), Error> { + let req_id = try!(self.pre_verify_response(peer, request::Kind::Codes, &raw)); + + let raw_code: Vec = try!(raw.iter().skip(2).map(|x| x.as_val()).collect()); + + for handler in &self.handlers { + handler.on_code(EventContext { + peer: *peer, + io: io, + proto: self, + }, req_id, &raw_code); + } + + Ok(()) } // Handle a request for header proofs @@ -790,8 +891,27 @@ impl LightProtocol { } // Receive a response for header proofs - fn header_proofs(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { - unimplemented!() + fn header_proofs(&self, peer: &PeerId, io: &NetworkContext, raw: UntrustedRlp) -> Result<(), Error> { + fn decode_res(raw: UntrustedRlp) -> Result<(Bytes, Vec), ::rlp::DecoderError> { + Ok(( + try!(raw.val_at(0)), + try!(raw.at(1)).iter().map(|x| x.as_raw().to_owned()).collect(), + )) + } + + + let req_id = try!(self.pre_verify_response(peer, request::Kind::HeaderProofs, &raw)); + let raw_proofs: Vec<_> = try!(raw.iter().skip(2).map(decode_res).collect()); + + for handler in &self.handlers { + handler.on_header_proofs(EventContext { + peer: *peer, + io: io, + proto: self, + }, req_id, &raw_proofs); + } + + Ok(()) } // Receive a set of transactions to relay. diff --git a/ethcore/light/src/provider.rs b/ethcore/light/src/provider.rs index 264df0397..fe7156b58 100644 --- a/ethcore/light/src/provider.rs +++ b/ethcore/light/src/provider.rs @@ -71,7 +71,10 @@ pub trait Provider: Send + Sync { /// Each item in the resulting vector is either the raw bytecode or empty. fn contract_code(&self, req: request::ContractCodes) -> Vec; - /// Provide header proofs from the Canonical Hash Tries. + /// Provide header proofs from the Canonical Hash Tries as well as the headers + /// they correspond to -- each element in the returned vector is a 2-tuple. + /// The first element is a block header and the second a merkle proof of + /// the header in a requested CHT. fn header_proofs(&self, req: request::HeaderProofs) -> Vec; /// Provide pending transactions.