handle all LES requests

This commit is contained in:
Robert Habermeier 2016-11-15 18:19:16 +01:00
parent cb54152c23
commit 7bfb9e4003
4 changed files with 234 additions and 49 deletions

View File

@ -228,6 +228,16 @@ impl FlowParams {
buf.estimate = ::std::cmp::min(self.limit, buf.estimate + (elapsed * self.recharge));
}
/// Refund some buffer which was previously deducted.
/// Does not update the recharge timestamp.
pub fn refund(&self, buf: &mut Buffer, refund_amount: U256) {
buf.estimate = buf.estimate + refund_amount;
if buf.estimate > self.limit {
buf.estimate = self.limit
}
}
}
#[cfg(test)]

View File

@ -23,7 +23,7 @@ use io::TimerToken;
use network::{NetworkProtocolHandler, NetworkContext, NetworkError, PeerId};
use rlp::{RlpStream, Stream, UntrustedRlp, View};
use util::hash::H256;
use util::RwLock;
use util::{Mutex, RwLock, U256};
use std::collections::{HashMap, HashSet};
use std::sync::atomic::AtomicUsize;
@ -94,7 +94,7 @@ struct PendingPeer {
// data about each peer.
struct Peer {
local_buffer: Buffer, // their buffer relative to us
local_buffer: Mutex<Buffer>, // their buffer relative to us
remote_buffer: Buffer, // our buffer relative to them
current_asking: HashSet<usize>, // pending request ids.
status: Status,
@ -103,6 +103,28 @@ struct Peer {
sent_head: H256, // last head we've given them.
}
impl Peer {
// check the maximum cost of a request, returning an error if there's
// not enough buffer left.
// returns the calculated maximum cost.
fn deduct_max(&self, flow_params: &FlowParams, kind: request::Kind, max: usize) -> Result<U256, Error> {
let mut local_buffer = self.local_buffer.lock();
flow_params.recharge(&mut local_buffer);
let max_cost = flow_params.compute_cost(kind, max);
try!(local_buffer.deduct_cost(max_cost));
Ok(max_cost)
}
// refund buffer for a request. returns new buffer amount.
fn refund(&self, flow_params: &FlowParams, amount: U256) -> U256 {
let mut local_buffer = self.local_buffer.lock();
flow_params.refund(&mut local_buffer, amount);
local_buffer.current()
}
}
/// This is an implementation of the light ethereum network protocol, abstracted
/// over a `Provider` of data and a p2p network.
///
@ -220,7 +242,7 @@ impl LightProtocol {
}
self.peers.write().insert(*peer, Peer {
local_buffer: self.flow_params.create_buffer(),
local_buffer: Mutex::new(self.flow_params.create_buffer()),
remote_buffer: flow_params.create_buffer(),
current_asking: HashSet::new(),
status: status,
@ -276,15 +298,15 @@ impl LightProtocol {
fn get_block_headers(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> {
const MAX_HEADERS: usize = 512;
let mut present_buffer = match self.peers.read().get(peer) {
Some(peer) => peer.local_buffer.clone(),
let peers = self.peers.read();
let peer = match peers.get(peer) {
Some(peer) => peer,
None => {
debug!(target: "les", "Ignoring announcement from unknown peer");
debug!(target: "les", "Ignoring request from unknown peer");
return Ok(())
}
};
self.flow_params.recharge(&mut present_buffer);
let req_id: u64 = try!(data.val_at(0));
let block = {
@ -300,24 +322,13 @@ impl LightProtocol {
reverse: try!(data.val_at(4)),
};
let max_cost = self.flow_params.compute_cost(request::Kind::Headers, req.max);
try!(present_buffer.deduct_cost(max_cost));
let max_cost = try!(peer.deduct_max(&self.flow_params, request::Kind::Headers, req.max));
let response = self.provider.block_headers(req);
let actual_cost = self.flow_params.compute_cost(request::Kind::Headers, response.len());
assert!(max_cost >= actual_cost, "Actual cost exceeded maximum computed cost.");
let cur_buffer = match self.peers.write().get_mut(peer) {
Some(peer) => {
self.flow_params.recharge(&mut peer.local_buffer);
try!(peer.local_buffer.deduct_cost(actual_cost));
peer.local_buffer.current()
}
None => {
debug!(target: "les", "peer disconnected during serving of request.");
return Ok(())
}
};
let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost);
io.respond(packet::BLOCK_HEADERS, {
let mut stream = RlpStream::new_list(response.len() + 2);
stream.append(&req_id).append(&cur_buffer);
@ -339,39 +350,29 @@ impl LightProtocol {
fn get_block_bodies(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> {
const MAX_BODIES: usize = 256;
let mut present_buffer = match self.peers.read().get(peer) {
Some(peer) => peer.local_buffer.clone(),
let peers = self.peers.read();
let peer = match peers.get(peer) {
Some(peer) => peer,
None => {
debug!(target: "les", "Ignoring announcement from unknown peer");
debug!(target: "les", "Ignoring request from unknown peer");
return Ok(())
}
};
self.flow_params.recharge(&mut present_buffer);
let req_id: u64 = try!(data.val_at(0));
let req = request::Bodies {
block_hashes: try!(data.iter().skip(1).take(MAX_BODIES).map(|x| x.as_val()).collect())
};
let max_cost = self.flow_params.compute_cost(request::Kind::Bodies, req.block_hashes.len());
try!(present_buffer.deduct_cost(max_cost));
let max_cost = try!(peer.deduct_max(&self.flow_params, request::Kind::Bodies, req.block_hashes.len()));
let response = self.provider.block_bodies(req);
let response_len = response.iter().filter(|x| &x[..] != &::rlp::EMPTY_LIST_RLP).count();
let actual_cost = self.flow_params.compute_cost(request::Kind::Bodies, response_len);
assert!(max_cost >= actual_cost, "Actual cost exceeded maximum computed cost.");
let cur_buffer = match self.peers.write().get_mut(peer) {
Some(peer) => {
self.flow_params.recharge(&mut peer.local_buffer);
try!(peer.local_buffer.deduct_cost(actual_cost));
peer.local_buffer.current()
}
None => {
debug!(target: "les", "peer disconnected during serving of request.");
return Ok(())
}
};
let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost);
io.respond(packet::BLOCK_BODIES, {
let mut stream = RlpStream::new_list(response.len() + 2);
@ -391,8 +392,43 @@ impl LightProtocol {
}
// Handle a request for receipts.
fn get_receipts(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> {
unimplemented!()
fn get_receipts(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> {
const MAX_RECEIPTS: usize = 256;
let peers = self.peers.read();
let peer = match peers.get(peer) {
Some(peer) => peer,
None => {
debug!(target: "les", "Ignoring request from unknown peer");
return Ok(())
}
};
let req_id: u64 = try!(data.val_at(0));
let req = request::Receipts {
block_hashes: try!(data.iter().skip(1).take(MAX_RECEIPTS).map(|x| x.as_val()).collect())
};
let max_cost = try!(peer.deduct_max(&self.flow_params, request::Kind::Receipts, req.block_hashes.len()));
let response = self.provider.receipts(req);
let response_len = response.iter().filter(|x| &x[..] != &::rlp::EMPTY_LIST_RLP).count();
let actual_cost = self.flow_params.compute_cost(request::Kind::Receipts, response_len);
assert!(max_cost >= actual_cost, "Actual cost exceeded maximum computed cost.");
let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost);
io.respond(packet::RECEIPTS, {
let mut stream = RlpStream::new_list(response.len() + 2);
stream.append(&req_id).append(&cur_buffer);
for receipts in response {
stream.append_raw(&receipts, 1);
}
stream.out()
}).map_err(Into::into)
}
// Receive a response for receipts.
@ -401,8 +437,54 @@ impl LightProtocol {
}
// Handle a request for proofs.
fn get_proofs(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> {
unimplemented!()
fn get_proofs(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> {
const MAX_PROOFS: usize = 128;
let peers = self.peers.read();
let peer = match peers.get(peer) {
Some(peer) => peer,
None => {
debug!(target: "les", "Ignoring request from unknown peer");
return Ok(())
}
};
let req_id: u64 = try!(data.val_at(0));
let req = {
let requests: Result<Vec<_>, Error> = data.iter().skip(1).take(MAX_PROOFS).map(|x| {
Ok(request::StateProof {
block: try!(x.val_at(0)),
key1: try!(x.val_at(1)),
key2: if try!(x.at(2)).is_empty() { None } else { Some(try!(x.val_at(2))) },
from_level: try!(x.val_at(3)),
})
}).collect();
request::StateProofs {
requests: try!(requests),
}
};
let max_cost = try!(peer.deduct_max(&self.flow_params, request::Kind::StateProofs, req.requests.len()));
let response = self.provider.proofs(req);
let response_len = response.iter().filter(|x| &x[..] != &::rlp::EMPTY_LIST_RLP).count();
let actual_cost = self.flow_params.compute_cost(request::Kind::StateProofs, response_len);
assert!(max_cost >= actual_cost, "Actual cost exceeded maximum computed cost.");
let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost);
io.respond(packet::PROOFS, {
let mut stream = RlpStream::new_list(response.len() + 2);
stream.append(&req_id).append(&cur_buffer);
for proof in response {
stream.append_raw(&proof, 1);
}
stream.out()
}).map_err(Into::into)
}
// Receive a response for proofs.
@ -411,8 +493,52 @@ impl LightProtocol {
}
// Handle a request for contract code.
fn get_contract_code(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> {
unimplemented!()
fn get_contract_code(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> {
const MAX_CODES: usize = 256;
let peers = self.peers.read();
let peer = match peers.get(peer) {
Some(peer) => peer,
None => {
debug!(target: "les", "Ignoring request from unknown peer");
return Ok(())
}
};
let req_id: u64 = try!(data.val_at(0));
let req = {
let requests: Result<Vec<_>, Error> = data.iter().skip(1).take(MAX_CODES).map(|x| {
Ok(request::ContractCode {
block_hash: try!(x.val_at(0)),
account_key: try!(x.val_at(1)),
})
}).collect();
request::ContractCodes {
code_requests: try!(requests),
}
};
let max_cost = try!(peer.deduct_max(&self.flow_params, request::Kind::Codes, req.code_requests.len()));
let response = self.provider.contract_code(req);
let response_len = response.iter().filter(|x| !x.is_empty()).count();
let actual_cost = self.flow_params.compute_cost(request::Kind::Codes, response_len);
assert!(max_cost >= actual_cost, "Actual cost exceeded maximum computed cost.");
let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost);
io.respond(packet::CONTRACT_CODES, {
let mut stream = RlpStream::new_list(response.len() + 2);
stream.append(&req_id).append(&cur_buffer);
for code in response {
stream.append_raw(&code, 1);
}
stream.out()
}).map_err(Into::into)
}
// Receive a response for contract code.
@ -421,8 +547,53 @@ impl LightProtocol {
}
// Handle a request for header proofs
fn get_header_proofs(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> {
unimplemented!()
fn get_header_proofs(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> {
const MAX_PROOFS: usize = 256;
let peers = self.peers.read();
let peer = match peers.get(peer) {
Some(peer) => peer,
None => {
debug!(target: "les", "Ignoring request from unknown peer");
return Ok(())
}
};
let req_id: u64 = try!(data.val_at(0));
let req = {
let requests: Result<Vec<_>, Error> = data.iter().skip(1).take(MAX_PROOFS).map(|x| {
Ok(request::HeaderProof {
cht_number: try!(x.val_at(0)),
block_number: try!(x.val_at(1)),
from_level: try!(x.val_at(2)),
})
}).collect();
request::HeaderProofs {
requests: try!(requests),
}
};
let max_cost = try!(peer.deduct_max(&self.flow_params, request::Kind::HeaderProofs, req.requests.len()));
let response = self.provider.header_proofs(req);
let response_len = response.iter().filter(|x| &x[..] != ::rlp::EMPTY_LIST_RLP).count();
let actual_cost = self.flow_params.compute_cost(request::Kind::HeaderProofs, response_len);
assert!(max_cost >= actual_cost, "Actual cost exceeded maximum computed cost.");
let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost);
io.respond(packet::HEADER_PROOFS, {
let mut stream = RlpStream::new_list(response.len() + 2);
stream.append(&req_id).append(&cur_buffer);
for proof in response {
stream.append_raw(&proof, 1);
}
stream.out()
}).map_err(Into::into)
}
// Receive a response for header proofs

View File

@ -27,7 +27,8 @@ use light::request;
/// Defines the operations that a provider for `LES` must fulfill.
///
/// These are defined at [1], but may be subject to change.
/// Requests which can't be fulfilled should return an empty RLP list.
/// Requests which can't be fulfilled should return either an empty RLP list
/// or empty vector where appropriate.
///
/// [1]: https://github.com/ethcore/parity/wiki/Light-Ethereum-Subprotocol-(LES)
pub trait Provider: Send + Sync {
@ -35,6 +36,8 @@ pub trait Provider: Send + Sync {
fn chain_info(&self) -> BlockChainInfo;
/// Find the depth of a common ancestor between two blocks.
/// If either block is unknown or an ancestor can't be found
/// then return `None`.
fn reorg_depth(&self, a: &H256, b: &H256) -> Option<u64>;
/// Earliest block where state queries are available.
@ -59,10 +62,11 @@ pub trait Provider: Send + Sync {
/// Provide a set of merkle proofs, as requested. Each request is a
/// block hash and request parameters.
///
/// Returns a vector to RLP-encoded lists satisfying the requests.
/// Returns a vector of RLP-encoded lists satisfying the requests.
fn proofs(&self, req: request::StateProofs) -> Vec<Bytes>;
/// Provide contract code for the specified (block_hash, account_hash) pairs.
/// Each item in the resulting vector is either the raw bytecode or empty.
fn contract_code(&self, req: request::ContractCodes) -> Vec<Bytes>;
/// Provide header proofs from the Canonical Hash Tries.

View File

@ -103,7 +103,7 @@ pub struct HeaderProof {
#[derive(Debug, Clone, PartialEq, Eq, Binary)]
pub struct HeaderProofs {
/// All the proof requests.
pub requests: Vec<HeaderProofs>,
pub requests: Vec<HeaderProof>,
}
/// Kinds of requests.