diff --git a/ethcore/src/light/net/buffer_flow.rs b/ethcore/src/light/net/buffer_flow.rs index 62f351515..8d6835db3 100644 --- a/ethcore/src/light/net/buffer_flow.rs +++ b/ethcore/src/light/net/buffer_flow.rs @@ -228,6 +228,16 @@ impl FlowParams { buf.estimate = ::std::cmp::min(self.limit, buf.estimate + (elapsed * self.recharge)); } + + /// Refund some buffer which was previously deducted. + /// Does not update the recharge timestamp. + pub fn refund(&self, buf: &mut Buffer, refund_amount: U256) { + buf.estimate = buf.estimate + refund_amount; + + if buf.estimate > self.limit { + buf.estimate = self.limit + } + } } #[cfg(test)] diff --git a/ethcore/src/light/net/mod.rs b/ethcore/src/light/net/mod.rs index 47146a2f9..1d7fc8f2d 100644 --- a/ethcore/src/light/net/mod.rs +++ b/ethcore/src/light/net/mod.rs @@ -23,7 +23,7 @@ use io::TimerToken; use network::{NetworkProtocolHandler, NetworkContext, NetworkError, PeerId}; use rlp::{RlpStream, Stream, UntrustedRlp, View}; use util::hash::H256; -use util::RwLock; +use util::{Mutex, RwLock, U256}; use std::collections::{HashMap, HashSet}; use std::sync::atomic::AtomicUsize; @@ -94,7 +94,7 @@ struct PendingPeer { // data about each peer. struct Peer { - local_buffer: Buffer, // their buffer relative to us + local_buffer: Mutex, // their buffer relative to us remote_buffer: Buffer, // our buffer relative to them current_asking: HashSet, // pending request ids. status: Status, @@ -103,6 +103,28 @@ struct Peer { sent_head: H256, // last head we've given them. } +impl Peer { + // check the maximum cost of a request, returning an error if there's + // not enough buffer left. + // returns the calculated maximum cost. + fn deduct_max(&self, flow_params: &FlowParams, kind: request::Kind, max: usize) -> Result { + let mut local_buffer = self.local_buffer.lock(); + flow_params.recharge(&mut local_buffer); + + let max_cost = flow_params.compute_cost(kind, max); + try!(local_buffer.deduct_cost(max_cost)); + Ok(max_cost) + } + + // refund buffer for a request. returns new buffer amount. + fn refund(&self, flow_params: &FlowParams, amount: U256) -> U256 { + let mut local_buffer = self.local_buffer.lock(); + flow_params.refund(&mut local_buffer, amount); + + local_buffer.current() + } +} + /// This is an implementation of the light ethereum network protocol, abstracted /// over a `Provider` of data and a p2p network. /// @@ -220,7 +242,7 @@ impl LightProtocol { } self.peers.write().insert(*peer, Peer { - local_buffer: self.flow_params.create_buffer(), + local_buffer: Mutex::new(self.flow_params.create_buffer()), remote_buffer: flow_params.create_buffer(), current_asking: HashSet::new(), status: status, @@ -276,15 +298,15 @@ impl LightProtocol { fn get_block_headers(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { const MAX_HEADERS: usize = 512; - let mut present_buffer = match self.peers.read().get(peer) { - Some(peer) => peer.local_buffer.clone(), + let peers = self.peers.read(); + let peer = match peers.get(peer) { + Some(peer) => peer, None => { - debug!(target: "les", "Ignoring announcement from unknown peer"); + debug!(target: "les", "Ignoring request from unknown peer"); return Ok(()) } }; - self.flow_params.recharge(&mut present_buffer); let req_id: u64 = try!(data.val_at(0)); let block = { @@ -300,24 +322,13 @@ impl LightProtocol { reverse: try!(data.val_at(4)), }; - let max_cost = self.flow_params.compute_cost(request::Kind::Headers, req.max); - try!(present_buffer.deduct_cost(max_cost)); + let max_cost = try!(peer.deduct_max(&self.flow_params, request::Kind::Headers, req.max)); let response = self.provider.block_headers(req); let actual_cost = self.flow_params.compute_cost(request::Kind::Headers, response.len()); + assert!(max_cost >= actual_cost, "Actual cost exceeded maximum computed cost."); - let cur_buffer = match self.peers.write().get_mut(peer) { - Some(peer) => { - self.flow_params.recharge(&mut peer.local_buffer); - try!(peer.local_buffer.deduct_cost(actual_cost)); - peer.local_buffer.current() - } - None => { - debug!(target: "les", "peer disconnected during serving of request."); - return Ok(()) - } - }; - + let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost); io.respond(packet::BLOCK_HEADERS, { let mut stream = RlpStream::new_list(response.len() + 2); stream.append(&req_id).append(&cur_buffer); @@ -339,39 +350,29 @@ impl LightProtocol { fn get_block_bodies(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { const MAX_BODIES: usize = 256; - let mut present_buffer = match self.peers.read().get(peer) { - Some(peer) => peer.local_buffer.clone(), + let peers = self.peers.read(); + let peer = match peers.get(peer) { + Some(peer) => peer, None => { - debug!(target: "les", "Ignoring announcement from unknown peer"); + debug!(target: "les", "Ignoring request from unknown peer"); return Ok(()) } }; - self.flow_params.recharge(&mut present_buffer); let req_id: u64 = try!(data.val_at(0)); let req = request::Bodies { block_hashes: try!(data.iter().skip(1).take(MAX_BODIES).map(|x| x.as_val()).collect()) }; - let max_cost = self.flow_params.compute_cost(request::Kind::Bodies, req.block_hashes.len()); - try!(present_buffer.deduct_cost(max_cost)); + let max_cost = try!(peer.deduct_max(&self.flow_params, request::Kind::Bodies, req.block_hashes.len())); let response = self.provider.block_bodies(req); let response_len = response.iter().filter(|x| &x[..] != &::rlp::EMPTY_LIST_RLP).count(); let actual_cost = self.flow_params.compute_cost(request::Kind::Bodies, response_len); + assert!(max_cost >= actual_cost, "Actual cost exceeded maximum computed cost."); - let cur_buffer = match self.peers.write().get_mut(peer) { - Some(peer) => { - self.flow_params.recharge(&mut peer.local_buffer); - try!(peer.local_buffer.deduct_cost(actual_cost)); - peer.local_buffer.current() - } - None => { - debug!(target: "les", "peer disconnected during serving of request."); - return Ok(()) - } - }; + let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost); io.respond(packet::BLOCK_BODIES, { let mut stream = RlpStream::new_list(response.len() + 2); @@ -391,8 +392,43 @@ impl LightProtocol { } // Handle a request for receipts. - fn get_receipts(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { - unimplemented!() + fn get_receipts(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { + const MAX_RECEIPTS: usize = 256; + + let peers = self.peers.read(); + let peer = match peers.get(peer) { + Some(peer) => peer, + None => { + debug!(target: "les", "Ignoring request from unknown peer"); + return Ok(()) + } + }; + + let req_id: u64 = try!(data.val_at(0)); + + let req = request::Receipts { + block_hashes: try!(data.iter().skip(1).take(MAX_RECEIPTS).map(|x| x.as_val()).collect()) + }; + + let max_cost = try!(peer.deduct_max(&self.flow_params, request::Kind::Receipts, req.block_hashes.len())); + + let response = self.provider.receipts(req); + let response_len = response.iter().filter(|x| &x[..] != &::rlp::EMPTY_LIST_RLP).count(); + let actual_cost = self.flow_params.compute_cost(request::Kind::Receipts, response_len); + assert!(max_cost >= actual_cost, "Actual cost exceeded maximum computed cost."); + + let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost); + + io.respond(packet::RECEIPTS, { + let mut stream = RlpStream::new_list(response.len() + 2); + stream.append(&req_id).append(&cur_buffer); + + for receipts in response { + stream.append_raw(&receipts, 1); + } + + stream.out() + }).map_err(Into::into) } // Receive a response for receipts. @@ -401,8 +437,54 @@ impl LightProtocol { } // Handle a request for proofs. - fn get_proofs(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { - unimplemented!() + fn get_proofs(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { + const MAX_PROOFS: usize = 128; + + let peers = self.peers.read(); + let peer = match peers.get(peer) { + Some(peer) => peer, + None => { + debug!(target: "les", "Ignoring request from unknown peer"); + return Ok(()) + } + }; + + let req_id: u64 = try!(data.val_at(0)); + + let req = { + let requests: Result, Error> = data.iter().skip(1).take(MAX_PROOFS).map(|x| { + Ok(request::StateProof { + block: try!(x.val_at(0)), + key1: try!(x.val_at(1)), + key2: if try!(x.at(2)).is_empty() { None } else { Some(try!(x.val_at(2))) }, + from_level: try!(x.val_at(3)), + }) + }).collect(); + + request::StateProofs { + requests: try!(requests), + } + }; + + let max_cost = try!(peer.deduct_max(&self.flow_params, request::Kind::StateProofs, req.requests.len())); + + let response = self.provider.proofs(req); + let response_len = response.iter().filter(|x| &x[..] != &::rlp::EMPTY_LIST_RLP).count(); + let actual_cost = self.flow_params.compute_cost(request::Kind::StateProofs, response_len); + assert!(max_cost >= actual_cost, "Actual cost exceeded maximum computed cost."); + + let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost); + + io.respond(packet::PROOFS, { + let mut stream = RlpStream::new_list(response.len() + 2); + stream.append(&req_id).append(&cur_buffer); + + for proof in response { + stream.append_raw(&proof, 1); + } + + stream.out() + }).map_err(Into::into) } // Receive a response for proofs. @@ -411,8 +493,52 @@ impl LightProtocol { } // Handle a request for contract code. - fn get_contract_code(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { - unimplemented!() + fn get_contract_code(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { + const MAX_CODES: usize = 256; + + let peers = self.peers.read(); + let peer = match peers.get(peer) { + Some(peer) => peer, + None => { + debug!(target: "les", "Ignoring request from unknown peer"); + return Ok(()) + } + }; + + let req_id: u64 = try!(data.val_at(0)); + + let req = { + let requests: Result, Error> = data.iter().skip(1).take(MAX_CODES).map(|x| { + Ok(request::ContractCode { + block_hash: try!(x.val_at(0)), + account_key: try!(x.val_at(1)), + }) + }).collect(); + + request::ContractCodes { + code_requests: try!(requests), + } + }; + + let max_cost = try!(peer.deduct_max(&self.flow_params, request::Kind::Codes, req.code_requests.len())); + + let response = self.provider.contract_code(req); + let response_len = response.iter().filter(|x| !x.is_empty()).count(); + let actual_cost = self.flow_params.compute_cost(request::Kind::Codes, response_len); + assert!(max_cost >= actual_cost, "Actual cost exceeded maximum computed cost."); + + let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost); + + io.respond(packet::CONTRACT_CODES, { + let mut stream = RlpStream::new_list(response.len() + 2); + stream.append(&req_id).append(&cur_buffer); + + for code in response { + stream.append_raw(&code, 1); + } + + stream.out() + }).map_err(Into::into) } // Receive a response for contract code. @@ -421,8 +547,53 @@ impl LightProtocol { } // Handle a request for header proofs - fn get_header_proofs(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { - unimplemented!() + fn get_header_proofs(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { + const MAX_PROOFS: usize = 256; + + let peers = self.peers.read(); + let peer = match peers.get(peer) { + Some(peer) => peer, + None => { + debug!(target: "les", "Ignoring request from unknown peer"); + return Ok(()) + } + }; + + let req_id: u64 = try!(data.val_at(0)); + + let req = { + let requests: Result, Error> = data.iter().skip(1).take(MAX_PROOFS).map(|x| { + Ok(request::HeaderProof { + cht_number: try!(x.val_at(0)), + block_number: try!(x.val_at(1)), + from_level: try!(x.val_at(2)), + }) + }).collect(); + + request::HeaderProofs { + requests: try!(requests), + } + }; + + let max_cost = try!(peer.deduct_max(&self.flow_params, request::Kind::HeaderProofs, req.requests.len())); + + let response = self.provider.header_proofs(req); + let response_len = response.iter().filter(|x| &x[..] != ::rlp::EMPTY_LIST_RLP).count(); + let actual_cost = self.flow_params.compute_cost(request::Kind::HeaderProofs, response_len); + assert!(max_cost >= actual_cost, "Actual cost exceeded maximum computed cost."); + + let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost); + + io.respond(packet::HEADER_PROOFS, { + let mut stream = RlpStream::new_list(response.len() + 2); + stream.append(&req_id).append(&cur_buffer); + + for proof in response { + stream.append_raw(&proof, 1); + } + + stream.out() + }).map_err(Into::into) } // Receive a response for header proofs diff --git a/ethcore/src/light/provider.rs b/ethcore/src/light/provider.rs index bf65acd18..5aa61828a 100644 --- a/ethcore/src/light/provider.rs +++ b/ethcore/src/light/provider.rs @@ -27,7 +27,8 @@ use light::request; /// Defines the operations that a provider for `LES` must fulfill. /// /// These are defined at [1], but may be subject to change. -/// Requests which can't be fulfilled should return an empty RLP list. +/// Requests which can't be fulfilled should return either an empty RLP list +/// or empty vector where appropriate. /// /// [1]: https://github.com/ethcore/parity/wiki/Light-Ethereum-Subprotocol-(LES) pub trait Provider: Send + Sync { @@ -35,6 +36,8 @@ pub trait Provider: Send + Sync { fn chain_info(&self) -> BlockChainInfo; /// Find the depth of a common ancestor between two blocks. + /// If either block is unknown or an ancestor can't be found + /// then return `None`. fn reorg_depth(&self, a: &H256, b: &H256) -> Option; /// Earliest block where state queries are available. @@ -59,10 +62,11 @@ pub trait Provider: Send + Sync { /// Provide a set of merkle proofs, as requested. Each request is a /// block hash and request parameters. /// - /// Returns a vector to RLP-encoded lists satisfying the requests. + /// Returns a vector of RLP-encoded lists satisfying the requests. fn proofs(&self, req: request::StateProofs) -> Vec; /// Provide contract code for the specified (block_hash, account_hash) pairs. + /// Each item in the resulting vector is either the raw bytecode or empty. fn contract_code(&self, req: request::ContractCodes) -> Vec; /// Provide header proofs from the Canonical Hash Tries. diff --git a/ethcore/src/types/les_request.rs b/ethcore/src/types/les_request.rs index 6e822dcdd..09d96db09 100644 --- a/ethcore/src/types/les_request.rs +++ b/ethcore/src/types/les_request.rs @@ -103,7 +103,7 @@ pub struct HeaderProof { #[derive(Debug, Clone, PartialEq, Eq, Binary)] pub struct HeaderProofs { /// All the proof requests. - pub requests: Vec, + pub requests: Vec, } /// Kinds of requests.