Implement PIP messages, request builder, and handlers (#4945)

* return errors on database corruption

* fix tests, json tests

* fix remainder of build

* buffer flow -> request credits

* proving state backend

* generate transaction proofs from provider

* network messages for transaction proof

* transaction proof test

* test for transaction proof message

* fix call bug

* request transaction proofs from on_demand

* most of proved_execution rpc

* proved execution future

* initial request definitions

* RLP encoding and decoding for requests

* proofs of non-existance in ProvingBlockChainClient

* new requests in provider.

* encode and decode responses

* complete initial request changes

* handle request packet in LightProtocol

* handle response packets

* implement requesting from

* re-do cost table

* get tests compiling

* fix cost table RLP encoding

* roundtrip tests for request types

* request builder tests

* move request_builder -> request::builder

* get network tests working

* return only complete headers responses

* request builder improvements

* New version of jsonrpc.

* split request filling into fill,complete

* Better invalid encoding messages

* Fixing deprecated methods of tokio_core

* use PIP messages in on_demand, old API

* migrate oneshot::complete to send in on_demand

* get on_demand tests to compile

* port ethsync to PIP messages

* adjust to minor on_demand API changes in RPC

* Using dedicated branch for jsonrpc

* Bump
This commit is contained in:
Robert Habermeier 2017-03-23 13:17:05 +01:00 committed by Gav Wood
parent b931a225ba
commit 64cec5ff7d
28 changed files with 2800 additions and 2095 deletions

View File

@ -1,5 +1,5 @@
[package] [package]
description = "Parity LES primitives" description = "Parity Light Client Implementation"
homepage = "http://parity.io" homepage = "http://parity.io"
license = "GPL-3.0" license = "GPL-3.0"
name = "ethcore-light" name = "ethcore-light"

View File

@ -24,7 +24,6 @@
//! - It stores only headers (and a pruned subset of them) //! - It stores only headers (and a pruned subset of them)
//! - To allow for flexibility in the database layout once that's incorporated. //! - To allow for flexibility in the database layout once that's incorporated.
// TODO: use DB instead of memory. DB Layout: just the contents of `candidates`/`headers` // TODO: use DB instead of memory. DB Layout: just the contents of `candidates`/`headers`
//
use std::collections::{BTreeMap, HashMap}; use std::collections::{BTreeMap, HashMap};

View File

@ -31,7 +31,7 @@ use ethcore::service::ClientIoMessage;
use ethcore::encoded; use ethcore::encoded;
use io::IoChannel; use io::IoChannel;
use util::{Bytes, DBValue, H256, Mutex, RwLock}; use util::{H256, Mutex, RwLock};
use self::header_chain::{AncestryIter, HeaderChain}; use self::header_chain::{AncestryIter, HeaderChain};
@ -315,50 +315,3 @@ impl LightChainClient for Client {
Client::cht_root(self, i) Client::cht_root(self, i)
} }
} }
// dummy implementation, should be removed when a `TestClient` is added.
impl ::provider::Provider for Client {
fn chain_info(&self) -> BlockChainInfo {
Client::chain_info(self)
}
fn reorg_depth(&self, _a: &H256, _b: &H256) -> Option<u64> {
None
}
fn earliest_state(&self) -> Option<u64> {
None
}
fn block_header(&self, id: BlockId) -> Option<encoded::Header> {
Client::block_header(self, id)
}
fn block_body(&self, _id: BlockId) -> Option<encoded::Body> {
None
}
fn block_receipts(&self, _hash: &H256) -> Option<Bytes> {
None
}
fn state_proof(&self, _req: ::request::StateProof) -> Vec<Bytes> {
Vec::new()
}
fn contract_code(&self, _req: ::request::ContractCode) -> Bytes {
Vec::new()
}
fn header_proof(&self, _req: ::request::HeaderProof) -> Option<(encoded::Header, Vec<Bytes>)> {
None
}
fn transaction_proof(&self, _req: ::request::TransactionProof) -> Option<Vec<DBValue>> {
None
}
fn ready_transactions(&self) -> Vec<::ethcore::transaction::PendingTransaction> {
Vec::new()
}
}

View File

@ -26,7 +26,7 @@
//! use-cases like sending transactions from a personal account. //! use-cases like sending transactions from a personal account.
//! //!
//! The light client performs a header-only sync, doing verification and pruning //! The light client performs a header-only sync, doing verification and pruning
//! historical blocks. Upon pruning, batches of 2048 blocks have a number => hash //! historical blocks. Upon pruning, batches of 2048 blocks have a number => (hash, TD)
//! mapping sealed into "canonical hash tries" which can later be used to verify //! mapping sealed into "canonical hash tries" which can later be used to verify
//! historical block queries from peers. //! historical block queries from peers.
@ -57,7 +57,7 @@ mod types;
pub use self::provider::Provider; pub use self::provider::Provider;
pub use self::transaction_queue::TransactionQueue; pub use self::transaction_queue::TransactionQueue;
pub use types::les_request as request; pub use types::request as request;
#[macro_use] #[macro_use]
extern crate log; extern crate log;

View File

@ -12,7 +12,7 @@
// GNU General Public License for more details. // GNU General Public License for more details.
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! I/O and event context generalizations. //! I/O and event context generalizations.
@ -20,7 +20,7 @@ use network::{NetworkContext, PeerId, NodeId};
use super::{Announcement, LightProtocol, ReqId}; use super::{Announcement, LightProtocol, ReqId};
use super::error::Error; use super::error::Error;
use request::{self, Request}; use request::Requests;
/// An I/O context which allows sending and receiving packets as well as /// An I/O context which allows sending and receiving packets as well as
/// disconnecting peers. This is used as a generalization of the portions /// disconnecting peers. This is used as a generalization of the portions
@ -50,13 +50,13 @@ pub trait IoContext {
impl<'a> IoContext for NetworkContext<'a> { impl<'a> IoContext for NetworkContext<'a> {
fn send(&self, peer: PeerId, packet_id: u8, packet_body: Vec<u8>) { fn send(&self, peer: PeerId, packet_id: u8, packet_body: Vec<u8>) {
if let Err(e) = self.send(peer, packet_id, packet_body) { if let Err(e) = self.send(peer, packet_id, packet_body) {
debug!(target: "les", "Error sending packet to peer {}: {}", peer, e); debug!(target: "pip", "Error sending packet to peer {}: {}", peer, e);
} }
} }
fn respond(&self, packet_id: u8, packet_body: Vec<u8>) { fn respond(&self, packet_id: u8, packet_body: Vec<u8>) {
if let Err(e) = self.respond(packet_id, packet_body) { if let Err(e) = self.respond(packet_id, packet_body) {
debug!(target: "les", "Error responding to peer message: {}", e); debug!(target: "pip", "Error responding to peer message: {}", e);
} }
} }
@ -83,16 +83,17 @@ pub trait BasicContext {
fn persistent_peer_id(&self, peer: PeerId) -> Option<NodeId>; fn persistent_peer_id(&self, peer: PeerId) -> Option<NodeId>;
/// Make a request from a peer. /// Make a request from a peer.
fn request_from(&self, peer: PeerId, request: Request) -> Result<ReqId, Error>; ///
/// Fails on: nonexistent peer, network error, peer not server,
/// insufficient credits. Does not check capabilities before sending.
/// On success, returns a request id which can later be coordinated
/// with an event.
fn request_from(&self, peer: PeerId, request: Requests) -> Result<ReqId, Error>;
/// Make an announcement of new capabilities to the rest of the peers. /// Make an announcement of new capabilities to the rest of the peers.
// TODO: maybe just put this on a timer in LightProtocol? // TODO: maybe just put this on a timer in LightProtocol?
fn make_announcement(&self, announcement: Announcement); fn make_announcement(&self, announcement: Announcement);
/// Find the maximum number of requests of a specific type which can be made from
/// supplied peer.
fn max_requests(&self, peer: PeerId, kind: request::Kind) -> usize;
/// Disconnect a peer. /// Disconnect a peer.
fn disconnect_peer(&self, peer: PeerId); fn disconnect_peer(&self, peer: PeerId);
@ -123,18 +124,14 @@ impl<'a> BasicContext for TickCtx<'a> {
self.io.persistent_peer_id(id) self.io.persistent_peer_id(id)
} }
fn request_from(&self, peer: PeerId, request: Request) -> Result<ReqId, Error> { fn request_from(&self, peer: PeerId, requests: Requests) -> Result<ReqId, Error> {
self.proto.request_from(self.io, &peer, request) self.proto.request_from(self.io, &peer, requests)
} }
fn make_announcement(&self, announcement: Announcement) { fn make_announcement(&self, announcement: Announcement) {
self.proto.make_announcement(self.io, announcement); self.proto.make_announcement(self.io, announcement);
} }
fn max_requests(&self, peer: PeerId, kind: request::Kind) -> usize {
self.proto.max_requests(peer, kind)
}
fn disconnect_peer(&self, peer: PeerId) { fn disconnect_peer(&self, peer: PeerId) {
self.io.disconnect_peer(peer); self.io.disconnect_peer(peer);
} }
@ -160,18 +157,14 @@ impl<'a> BasicContext for Ctx<'a> {
self.io.persistent_peer_id(id) self.io.persistent_peer_id(id)
} }
fn request_from(&self, peer: PeerId, request: Request) -> Result<ReqId, Error> { fn request_from(&self, peer: PeerId, requests: Requests) -> Result<ReqId, Error> {
self.proto.request_from(self.io, &peer, request) self.proto.request_from(self.io, &peer, requests)
} }
fn make_announcement(&self, announcement: Announcement) { fn make_announcement(&self, announcement: Announcement) {
self.proto.make_announcement(self.io, announcement); self.proto.make_announcement(self.io, announcement);
} }
fn max_requests(&self, peer: PeerId, kind: request::Kind) -> usize {
self.proto.max_requests(peer, kind)
}
fn disconnect_peer(&self, peer: PeerId) { fn disconnect_peer(&self, peer: PeerId) {
self.io.disconnect_peer(peer); self.io.disconnect_peer(peer);
} }

View File

@ -56,6 +56,8 @@ pub enum Error {
UnknownPeer, UnknownPeer,
/// Unsolicited response. /// Unsolicited response.
UnsolicitedResponse, UnsolicitedResponse,
/// Bad back-reference in request.
BadBackReference,
/// Not a server. /// Not a server.
NotServer, NotServer,
/// Unsupported protocol version. /// Unsupported protocol version.
@ -78,6 +80,7 @@ impl Error {
Error::WrongNetwork => Punishment::Disable, Error::WrongNetwork => Punishment::Disable,
Error::UnknownPeer => Punishment::Disconnect, Error::UnknownPeer => Punishment::Disconnect,
Error::UnsolicitedResponse => Punishment::Disable, Error::UnsolicitedResponse => Punishment::Disable,
Error::BadBackReference => Punishment::Disable,
Error::NotServer => Punishment::Disable, Error::NotServer => Punishment::Disable,
Error::UnsupportedProtocolVersion(_) => Punishment::Disable, Error::UnsupportedProtocolVersion(_) => Punishment::Disable,
Error::BadProtocolVersion => Punishment::Disable, Error::BadProtocolVersion => Punishment::Disable,
@ -109,6 +112,7 @@ impl fmt::Display for Error {
Error::WrongNetwork => write!(f, "Wrong network"), Error::WrongNetwork => write!(f, "Wrong network"),
Error::UnknownPeer => write!(f, "Unknown peer"), Error::UnknownPeer => write!(f, "Unknown peer"),
Error::UnsolicitedResponse => write!(f, "Peer provided unsolicited data"), Error::UnsolicitedResponse => write!(f, "Peer provided unsolicited data"),
Error::BadBackReference => write!(f, "Bad back-reference in request."),
Error::NotServer => write!(f, "Peer not a server."), Error::NotServer => write!(f, "Peer not a server."),
Error::UnsupportedProtocolVersion(pv) => write!(f, "Unsupported protocol version: {}", pv), Error::UnsupportedProtocolVersion(pv) => write!(f, "Unsupported protocol version: {}", pv),
Error::BadProtocolVersion => write!(f, "Bad protocol version in handshake"), Error::BadProtocolVersion => write!(f, "Bad protocol version in handshake"),

File diff suppressed because it is too large Load Diff

View File

@ -26,18 +26,13 @@
//! Current default costs are picked completely arbitrarily, not based //! Current default costs are picked completely arbitrarily, not based
//! on any empirical timings or mathematical models. //! on any empirical timings or mathematical models.
use request; use request::{self, Request};
use super::packet;
use super::error::Error; use super::error::Error;
use rlp::*; use rlp::*;
use util::U256; use util::U256;
use time::{Duration, SteadyTime}; use time::{Duration, SteadyTime};
/// A request cost specification.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Cost(pub U256, pub U256);
/// Credits value. /// Credits value.
/// ///
/// Produced and recharged using `FlowParams`. /// Produced and recharged using `FlowParams`.
@ -81,90 +76,95 @@ impl Credits {
/// A cost table, mapping requests to base and per-request costs. /// A cost table, mapping requests to base and per-request costs.
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub struct CostTable { pub struct CostTable {
headers: Cost, // cost per header base: U256, // cost per packet.
bodies: Cost, headers: U256, // cost per header
receipts: Cost, body: U256,
state_proofs: Cost, receipts: U256,
contract_codes: Cost, account: U256,
header_proofs: Cost, storage: U256,
transaction_proof: Cost, // cost per gas. code: U256,
header_proof: U256,
transaction_proof: U256, // cost per gas.
} }
impl Default for CostTable { impl Default for CostTable {
fn default() -> Self { fn default() -> Self {
// arbitrarily chosen constants. // arbitrarily chosen constants.
CostTable { CostTable {
headers: Cost(100000.into(), 10000.into()), base: 100000.into(),
bodies: Cost(150000.into(), 15000.into()), headers: 10000.into(),
receipts: Cost(50000.into(), 5000.into()), body: 15000.into(),
state_proofs: Cost(250000.into(), 25000.into()), receipts: 5000.into(),
contract_codes: Cost(200000.into(), 20000.into()), account: 25000.into(),
header_proofs: Cost(150000.into(), 15000.into()), storage: 25000.into(),
transaction_proof: Cost(100000.into(), 2.into()), code: 20000.into(),
header_proof: 15000.into(),
transaction_proof: 2.into(),
} }
} }
} }
impl Encodable for CostTable { impl Encodable for CostTable {
fn rlp_append(&self, s: &mut RlpStream) { fn rlp_append(&self, s: &mut RlpStream) {
fn append_cost(s: &mut RlpStream, msg_id: u8, cost: &Cost) { fn append_cost(s: &mut RlpStream, cost: &U256, kind: request::Kind) {
s.begin_list(3) s.begin_list(2);
.append(&msg_id)
.append(&cost.0) // hack around https://github.com/ethcore/parity/issues/4356
.append(&cost.1); Encodable::rlp_append(&kind, s);
s.append(cost);
} }
s.begin_list(7); s.begin_list(9).append(&self.base);
append_cost(s, &self.headers, request::Kind::Headers);
append_cost(s, packet::GET_BLOCK_HEADERS, &self.headers); append_cost(s, &self.body, request::Kind::Body);
append_cost(s, packet::GET_BLOCK_BODIES, &self.bodies); append_cost(s, &self.receipts, request::Kind::Receipts);
append_cost(s, packet::GET_RECEIPTS, &self.receipts); append_cost(s, &self.account, request::Kind::Account);
append_cost(s, packet::GET_PROOFS, &self.state_proofs); append_cost(s, &self.storage, request::Kind::Storage);
append_cost(s, packet::GET_CONTRACT_CODES, &self.contract_codes); append_cost(s, &self.code, request::Kind::Code);
append_cost(s, packet::GET_HEADER_PROOFS, &self.header_proofs); append_cost(s, &self.header_proof, request::Kind::HeaderProof);
append_cost(s, packet::GET_TRANSACTION_PROOF, &self.transaction_proof); append_cost(s, &self.transaction_proof, request::Kind::Execution);
} }
} }
impl Decodable for CostTable { impl Decodable for CostTable {
fn decode(rlp: &UntrustedRlp) -> Result<Self, DecoderError> { fn decode(rlp: &UntrustedRlp) -> Result<Self, DecoderError> {
let base = rlp.val_at(0)?;
let mut headers = None; let mut headers = None;
let mut bodies = None; let mut body = None;
let mut receipts = None; let mut receipts = None;
let mut state_proofs = None; let mut account = None;
let mut contract_codes = None; let mut storage = None;
let mut header_proofs = None; let mut code = None;
let mut header_proof = None;
let mut transaction_proof = None; let mut transaction_proof = None;
for row in rlp.iter() { for cost_list in rlp.iter().skip(1) {
let msg_id: u8 = row.val_at(0)?; let cost = cost_list.val_at(1)?;
let cost = { match cost_list.val_at(0)? {
let base = row.val_at(1)?; request::Kind::Headers => headers = Some(cost),
let per = row.val_at(2)?; request::Kind::Body => body = Some(cost),
request::Kind::Receipts => receipts = Some(cost),
Cost(base, per) request::Kind::Account => account = Some(cost),
}; request::Kind::Storage => storage = Some(cost),
request::Kind::Code => code = Some(cost),
match msg_id { request::Kind::HeaderProof => header_proof = Some(cost),
packet::GET_BLOCK_HEADERS => headers = Some(cost), request::Kind::Execution => transaction_proof = Some(cost),
packet::GET_BLOCK_BODIES => bodies = Some(cost),
packet::GET_RECEIPTS => receipts = Some(cost),
packet::GET_PROOFS => state_proofs = Some(cost),
packet::GET_CONTRACT_CODES => contract_codes = Some(cost),
packet::GET_HEADER_PROOFS => header_proofs = Some(cost),
packet::GET_TRANSACTION_PROOF => transaction_proof = Some(cost),
_ => return Err(DecoderError::Custom("Unrecognized message in cost table")),
} }
} }
let unwrap_cost = |cost: Option<U256>| cost.ok_or(DecoderError::Custom("Not all costs specified in cost table."));
Ok(CostTable { Ok(CostTable {
headers: headers.ok_or(DecoderError::Custom("No headers cost specified"))?, base: base,
bodies: bodies.ok_or(DecoderError::Custom("No bodies cost specified"))?, headers: unwrap_cost(headers)?,
receipts: receipts.ok_or(DecoderError::Custom("No receipts cost specified"))?, body: unwrap_cost(body)?,
state_proofs: state_proofs.ok_or(DecoderError::Custom("No proofs cost specified"))?, receipts: unwrap_cost(receipts)?,
contract_codes: contract_codes.ok_or(DecoderError::Custom("No contract codes specified"))?, account: unwrap_cost(account)?,
header_proofs: header_proofs.ok_or(DecoderError::Custom("No header proofs cost specified"))?, storage: unwrap_cost(storage)?,
transaction_proof: transaction_proof.ok_or(DecoderError::Custom("No transaction proof gas cost specified"))?, code: unwrap_cost(code)?,
header_proof: unwrap_cost(header_proof)?,
transaction_proof: unwrap_cost(transaction_proof)?,
}) })
} }
} }
@ -190,17 +190,19 @@ impl FlowParams {
/// Create effectively infinite flow params. /// Create effectively infinite flow params.
pub fn free() -> Self { pub fn free() -> Self {
let free_cost = Cost(0.into(), 0.into()); let free_cost: U256 = 0.into();
FlowParams { FlowParams {
limit: (!0u64).into(), limit: (!0u64).into(),
recharge: 1.into(), recharge: 1.into(),
costs: CostTable { costs: CostTable {
base: free_cost.clone(),
headers: free_cost.clone(), headers: free_cost.clone(),
bodies: free_cost.clone(), body: free_cost.clone(),
receipts: free_cost.clone(), receipts: free_cost.clone(),
state_proofs: free_cost.clone(), account: free_cost.clone(),
contract_codes: free_cost.clone(), storage: free_cost.clone(),
header_proofs: free_cost.clone(), code: free_cost.clone(),
header_proof: free_cost.clone(),
transaction_proof: free_cost, transaction_proof: free_cost,
} }
} }
@ -212,61 +214,34 @@ impl FlowParams {
/// Get a reference to the cost table. /// Get a reference to the cost table.
pub fn cost_table(&self) -> &CostTable { &self.costs } pub fn cost_table(&self) -> &CostTable { &self.costs }
/// Get the base cost of a request.
pub fn base_cost(&self) -> U256 { self.costs.base }
/// Get a reference to the recharge rate. /// Get a reference to the recharge rate.
pub fn recharge_rate(&self) -> &U256 { &self.recharge } pub fn recharge_rate(&self) -> &U256 { &self.recharge }
/// Compute the actual cost of a request, given the kind of request /// Compute the actual cost of a request, given the kind of request
/// and number of requests made. /// and number of requests made.
pub fn compute_cost(&self, kind: request::Kind, amount: usize) -> U256 { pub fn compute_cost(&self, request: &Request) -> U256 {
let cost = match kind { match *request {
request::Kind::Headers => &self.costs.headers, Request::Headers(ref req) => self.costs.headers * req.max.into(),
request::Kind::Bodies => &self.costs.bodies, Request::HeaderProof(_) => self.costs.header_proof,
request::Kind::Receipts => &self.costs.receipts, Request::Body(_) => self.costs.body,
request::Kind::StateProofs => &self.costs.state_proofs, Request::Receipts(_) => self.costs.receipts,
request::Kind::Codes => &self.costs.contract_codes, Request::Account(_) => self.costs.account,
request::Kind::HeaderProofs => &self.costs.header_proofs, Request::Storage(_) => self.costs.storage,
request::Kind::TransactionProof => &self.costs.transaction_proof, Request::Code(_) => self.costs.code,
}; Request::Execution(ref req) => self.costs.transaction_proof * req.gas,
let amount: U256 = amount.into();
cost.0 + (amount * cost.1)
}
/// Compute the maximum number of costs of a specific kind which can be made
/// with the given amount of credits
/// Saturates at `usize::max()`. This is not a problem in practice because
/// this amount of requests is already prohibitively large.
pub fn max_amount(&self, credits: &Credits, kind: request::Kind) -> usize {
use util::Uint;
use std::usize;
let cost = match kind {
request::Kind::Headers => &self.costs.headers,
request::Kind::Bodies => &self.costs.bodies,
request::Kind::Receipts => &self.costs.receipts,
request::Kind::StateProofs => &self.costs.state_proofs,
request::Kind::Codes => &self.costs.contract_codes,
request::Kind::HeaderProofs => &self.costs.header_proofs,
request::Kind::TransactionProof => &self.costs.transaction_proof,
};
let start = credits.current();
if start <= cost.0 {
return 0;
} else if cost.1 == U256::zero() {
return usize::MAX;
}
let max = (start - cost.0) / cost.1;
if max >= usize::MAX.into() {
usize::MAX
} else {
max.as_u64() as usize
} }
} }
/// Create initial credits.. /// Compute the cost of a set of requests.
/// This is the base cost plus the cost of each individual request.
pub fn compute_cost_multi(&self, requests: &[Request]) -> U256 {
requests.iter().fold(self.costs.base, |cost, req| cost + self.compute_cost(req))
}
/// Create initial credits.
pub fn create_credits(&self) -> Credits { pub fn create_credits(&self) -> Credits {
Credits { Credits {
estimate: self.limit, estimate: self.limit,

View File

@ -24,7 +24,8 @@
use std::collections::{BTreeMap, HashMap}; use std::collections::{BTreeMap, HashMap};
use std::iter::FromIterator; use std::iter::FromIterator;
use request::{self, Request}; use request::Request;
use request::Requests;
use net::{timeout, ReqId}; use net::{timeout, ReqId};
use time::{Duration, SteadyTime}; use time::{Duration, SteadyTime};
@ -35,7 +36,7 @@ pub struct RequestSet {
counter: u64, counter: u64,
base: Option<SteadyTime>, base: Option<SteadyTime>,
ids: HashMap<ReqId, u64>, ids: HashMap<ReqId, u64>,
reqs: BTreeMap<u64, Request>, reqs: BTreeMap<u64, Requests>,
} }
impl Default for RequestSet { impl Default for RequestSet {
@ -50,8 +51,8 @@ impl Default for RequestSet {
} }
impl RequestSet { impl RequestSet {
/// Push a request onto the stack. /// Push requests onto the stack.
pub fn insert(&mut self, req_id: ReqId, req: Request, now: SteadyTime) { pub fn insert(&mut self, req_id: ReqId, req: Requests, now: SteadyTime) {
let counter = self.counter; let counter = self.counter;
self.ids.insert(req_id, counter); self.ids.insert(req_id, counter);
self.reqs.insert(counter, req); self.reqs.insert(counter, req);
@ -63,8 +64,8 @@ impl RequestSet {
self.counter += 1; self.counter += 1;
} }
/// Remove a request from the stack. /// Remove a set of requests from the stack.
pub fn remove(&mut self, req_id: &ReqId, now: SteadyTime) -> Option<Request> { pub fn remove(&mut self, req_id: &ReqId, now: SteadyTime) -> Option<Requests> {
let id = match self.ids.remove(&req_id) { let id = match self.ids.remove(&req_id) {
Some(id) => id, Some(id) => id,
None => return None, None => return None,
@ -89,22 +90,10 @@ impl RequestSet {
None => return false, None => return false,
}; };
let kind = self.reqs.values() let first_req = self.reqs.values().next()
.next() .expect("base existing implies `reqs` non-empty; qed");
.map(|r| r.kind())
.expect("base time implies `reqs` non-empty; qed");
let kind_timeout = match kind { base + compute_timeout(&first_req) <= now
request::Kind::Headers => timeout::HEADERS,
request::Kind::Bodies => timeout::BODIES,
request::Kind::Receipts => timeout::RECEIPTS,
request::Kind::StateProofs => timeout::PROOFS,
request::Kind::Codes => timeout::CONTRACT_CODES,
request::Kind::HeaderProofs => timeout::HEADER_PROOFS,
request::Kind::TransactionProof => timeout::TRANSACTION_PROOF,
};
base + Duration::milliseconds(kind_timeout) <= now
} }
/// Collect all pending request ids. /// Collect all pending request ids.
@ -121,25 +110,43 @@ impl RequestSet {
pub fn is_empty(&self) -> bool { self.len() == 0 } pub fn is_empty(&self) -> bool { self.len() == 0 }
} }
// helper to calculate timeout for a specific set of requests.
// it's a base amount + some amount per request.
fn compute_timeout(reqs: &Requests) -> Duration {
Duration::milliseconds(reqs.requests().iter().fold(timeout::BASE, |tm, req| {
tm + match *req {
Request::Headers(_) => timeout::HEADERS,
Request::HeaderProof(_) => timeout::HEADER_PROOF,
Request::Receipts(_) => timeout::RECEIPT,
Request::Body(_) => timeout::BODY,
Request::Account(_) => timeout::PROOF,
Request::Storage(_) => timeout::PROOF,
Request::Code(_) => timeout::CONTRACT_CODE,
Request::Execution(_) => timeout::TRANSACTION_PROOF,
}
}))
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use net::{timeout, ReqId}; use net::ReqId;
use request::{Request, Receipts}; use request::RequestBuilder;
use time::{SteadyTime, Duration}; use time::{SteadyTime, Duration};
use super::RequestSet; use super::{RequestSet, compute_timeout};
#[test] #[test]
fn multi_timeout() { fn multi_timeout() {
let test_begin = SteadyTime::now(); let test_begin = SteadyTime::now();
let mut req_set = RequestSet::default(); let mut req_set = RequestSet::default();
let the_req = Request::Receipts(Receipts { block_hashes: Vec::new() }); let the_req = RequestBuilder::default().build();
let req_time = compute_timeout(&the_req);
req_set.insert(ReqId(0), the_req.clone(), test_begin); req_set.insert(ReqId(0), the_req.clone(), test_begin);
req_set.insert(ReqId(1), the_req, test_begin + Duration::seconds(1)); req_set.insert(ReqId(1), the_req, test_begin + Duration::seconds(1));
assert_eq!(req_set.base, Some(test_begin)); assert_eq!(req_set.base, Some(test_begin));
let test_end = test_begin + Duration::milliseconds(timeout::RECEIPTS); let test_end = test_begin + req_time;
assert!(req_set.check_timeout(test_end)); assert!(req_set.check_timeout(test_end));
req_set.remove(&ReqId(0), test_begin + Duration::seconds(1)).unwrap(); req_set.remove(&ReqId(0), test_begin + Duration::seconds(1)).unwrap();

View File

@ -27,15 +27,31 @@ use network::{PeerId, NodeId};
use net::request_credits::FlowParams; use net::request_credits::FlowParams;
use net::context::IoContext; use net::context::IoContext;
use net::status::{Capabilities, Status, write_handshake}; use net::status::{Capabilities, Status, write_handshake};
use net::{encode_request, LightProtocol, Params, packet, Peer}; use net::{LightProtocol, Params, packet, Peer};
use provider::Provider; use provider::Provider;
use request::{self, Request, Headers}; use request;
use request::*;
use rlp::*; use rlp::*;
use util::{Address, Bytes, DBValue, H256, U256}; use util::{Address, H256, U256};
use std::sync::Arc; use std::sync::Arc;
// helper for encoding a single request into a packet.
// panics on bad backreference.
fn encode_single(request: Request) -> Requests {
let mut builder = RequestBuilder::default();
builder.push(request).unwrap();
builder.build()
}
// helper for making a packet out of `Requests`.
fn make_packet(req_id: usize, requests: &Requests) -> Vec<u8> {
let mut stream = RlpStream::new_list(2);
stream.append(&req_id).append_list(&requests.requests());
stream.out()
}
// expected result from a call. // expected result from a call.
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
enum Expect { enum Expect {
@ -99,35 +115,45 @@ impl Provider for TestProvider {
self.0.client.block_header(id) self.0.client.block_header(id)
} }
fn block_body(&self, id: BlockId) -> Option<encoded::Body> { fn block_body(&self, req: request::CompleteBodyRequest) -> Option<request::BodyResponse> {
self.0.client.block_body(id) self.0.client.block_body(req)
} }
fn block_receipts(&self, hash: &H256) -> Option<Bytes> { fn block_receipts(&self, req: request::CompleteReceiptsRequest) -> Option<request::ReceiptsResponse> {
self.0.client.block_receipts(&hash) self.0.client.block_receipts(req)
} }
fn state_proof(&self, req: request::StateProof) -> Vec<Bytes> { fn account_proof(&self, req: request::CompleteAccountRequest) -> Option<request::AccountResponse> {
match req.key2 { // sort of a leaf node
Some(_) => vec![::util::sha3::SHA3_NULL_RLP.to_vec()], let mut stream = RlpStream::new_list(2);
None => { stream.append(&req.address_hash).append_empty_data();
// sort of a leaf node Some(AccountResponse {
let mut stream = RlpStream::new_list(2); proof: vec![stream.out()],
stream.append(&req.key1).append_empty_data(); balance: 10.into(),
vec![stream.out()] nonce: 100.into(),
} code_hash: Default::default(),
} storage_root: Default::default(),
})
} }
fn contract_code(&self, req: request::ContractCode) -> Bytes { fn storage_proof(&self, req: request::CompleteStorageRequest) -> Option<request::StorageResponse> {
req.account_key.iter().chain(req.account_key.iter()).cloned().collect() Some(StorageResponse {
proof: vec![::rlp::encode(&req.key_hash).to_vec()],
value: req.key_hash | req.address_hash,
})
} }
fn header_proof(&self, _req: request::HeaderProof) -> Option<(encoded::Header, Vec<Bytes>)> { fn contract_code(&self, req: request::CompleteCodeRequest) -> Option<request::CodeResponse> {
Some(CodeResponse {
code: req.block_hash.iter().chain(req.code_hash.iter()).cloned().collect(),
})
}
fn header_proof(&self, _req: request::CompleteHeaderProofRequest) -> Option<request::HeaderProofResponse> {
None None
} }
fn transaction_proof(&self, _req: request::TransactionProof) -> Option<Vec<DBValue>> { fn transaction_proof(&self, _req: request::CompleteExecutionRequest) -> Option<request::ExecutionResponse> {
None None
} }
@ -226,14 +252,15 @@ fn credit_overflow() {
} }
// 1000 requests is far too many for the default flow params. // 1000 requests is far too many for the default flow params.
let request = encode_request(&Request::Headers(Headers { let requests = encode_single(Request::Headers(IncompleteHeadersRequest {
start: 1.into(), start: HashOrNumber::Number(1).into(),
max: 1000, max: 1000,
skip: 0, skip: 0,
reverse: false, reverse: false,
}), 111); }));
let request = make_packet(111, &requests);
proto.handle_packet(&Expect::Punish(1), &1, packet::GET_BLOCK_HEADERS, &request); proto.handle_packet(&Expect::Punish(1), &1, packet::REQUEST, &request);
} }
// test the basic request types -- these just make sure that requests are parsed // test the basic request types -- these just make sure that requests are parsed
@ -259,33 +286,36 @@ fn get_block_headers() {
proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status); proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status);
} }
let request = Headers { let request = Request::Headers(IncompleteHeadersRequest {
start: 1.into(), start: HashOrNumber::Number(1).into(),
max: 10, max: 10,
skip: 0, skip: 0,
reverse: false, reverse: false,
}; });
let req_id = 111; let req_id = 111;
let request_body = encode_request(&Request::Headers(request.clone()), req_id); let requests = encode_single(request.clone());
let request_body = make_packet(req_id, &requests);
let response = { let response = {
let headers: Vec<_> = (0..10).map(|i| provider.client.block_header(BlockId::Number(i + 1)).unwrap()).collect(); let headers: Vec<_> = (0..10).map(|i| provider.client.block_header(BlockId::Number(i + 1)).unwrap()).collect();
assert_eq!(headers.len(), 10); assert_eq!(headers.len(), 10);
let new_creds = *flow_params.limit() - flow_params.compute_cost(request::Kind::Headers, 10); let new_creds = *flow_params.limit() - flow_params.compute_cost_multi(requests.requests());
let mut response_stream = RlpStream::new_list(3); let response = vec![Response::Headers(HeadersResponse {
headers: headers,
})];
response_stream.append(&req_id).append(&new_creds).begin_list(10); let mut stream = RlpStream::new_list(3);
for header in headers { stream.append(&req_id).append(&new_creds).append_list(&response);
response_stream.append_raw(&header.into_inner(), 1);
}
response_stream.out() stream.out()
}; };
let expected = Expect::Respond(packet::BLOCK_HEADERS, response); let expected = Expect::Respond(packet::RESPONSE, response);
proto.handle_packet(&expected, &1, packet::GET_BLOCK_HEADERS, &request_body); proto.handle_packet(&expected, &1, packet::REQUEST, &request_body);
} }
#[test] #[test]
@ -308,33 +338,32 @@ fn get_block_bodies() {
proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status); proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status);
} }
let request = request::Bodies { let mut builder = RequestBuilder::default();
block_hashes: (0..10).map(|i| let mut bodies = Vec::new();
provider.client.block_header(BlockId::Number(i)).unwrap().hash()
).collect()
};
for i in 0..10 {
let hash = provider.client.block_header(BlockId::Number(i)).unwrap().hash();
builder.push(Request::Body(IncompleteBodyRequest {
hash: hash.into(),
})).unwrap();
bodies.push(Response::Body(provider.client.block_body(CompleteBodyRequest {
hash: hash,
}).unwrap()));
}
let req_id = 111; let req_id = 111;
let requests = builder.build();
let request_body = make_packet(req_id, &requests);
let request_body = encode_request(&Request::Bodies(request.clone()), req_id);
let response = { let response = {
let bodies: Vec<_> = (0..10).map(|i| provider.client.block_body(BlockId::Number(i + 1)).unwrap()).collect(); let new_creds = *flow_params.limit() - flow_params.compute_cost_multi(requests.requests());
assert_eq!(bodies.len(), 10);
let new_creds = *flow_params.limit() - flow_params.compute_cost(request::Kind::Bodies, 10);
let mut response_stream = RlpStream::new_list(3); let mut response_stream = RlpStream::new_list(3);
response_stream.append(&req_id).append(&new_creds).append_list(&bodies);
response_stream.append(&req_id).append(&new_creds).begin_list(10);
for body in bodies {
response_stream.append_raw(&body.into_inner(), 1);
}
response_stream.out() response_stream.out()
}; };
let expected = Expect::Respond(packet::BLOCK_BODIES, response); let expected = Expect::Respond(packet::RESPONSE, response);
proto.handle_packet(&expected, &1, packet::GET_BLOCK_BODIES, &request_body); proto.handle_packet(&expected, &1, packet::REQUEST, &request_body);
} }
#[test] #[test]
@ -359,36 +388,37 @@ fn get_block_receipts() {
// find the first 10 block hashes starting with `f` because receipts are only provided // find the first 10 block hashes starting with `f` because receipts are only provided
// by the test client in that case. // by the test client in that case.
let block_hashes: Vec<_> = (0..1000).map(|i| let block_hashes: Vec<H256> = (0..1000)
provider.client.block_header(BlockId::Number(i)).unwrap().hash() .map(|i| provider.client.block_header(BlockId::Number(i)).unwrap().hash())
).filter(|hash| format!("{}", hash).starts_with("f")).take(10).collect(); .filter(|hash| format!("{}", hash).starts_with("f"))
.take(10)
.collect();
let request = request::Receipts { let mut builder = RequestBuilder::default();
block_hashes: block_hashes.clone(), let mut receipts = Vec::new();
}; for hash in block_hashes.iter().cloned() {
builder.push(Request::Receipts(IncompleteReceiptsRequest { hash: hash.into() })).unwrap();
receipts.push(Response::Receipts(provider.client.block_receipts(CompleteReceiptsRequest {
hash: hash
}).unwrap()));
}
let req_id = 111; let req_id = 111;
let requests = builder.build();
let request_body = make_packet(req_id, &requests);
let request_body = encode_request(&Request::Receipts(request.clone()), req_id);
let response = { let response = {
let receipts: Vec<_> = block_hashes.iter() assert_eq!(receipts.len(), 10);
.map(|hash| provider.client.block_receipts(hash).unwrap())
.collect();
let new_creds = *flow_params.limit() - flow_params.compute_cost(request::Kind::Receipts, receipts.len()); let new_creds = *flow_params.limit() - flow_params.compute_cost_multi(requests.requests());
let mut response_stream = RlpStream::new_list(3); let mut response_stream = RlpStream::new_list(3);
response_stream.append(&req_id).append(&new_creds).append_list(&receipts);
response_stream.append(&req_id).append(&new_creds).begin_list(receipts.len());
for block_receipts in receipts {
response_stream.append_raw(&block_receipts, 1);
}
response_stream.out() response_stream.out()
}; };
let expected = Expect::Respond(packet::RECEIPTS, response); let expected = Expect::Respond(packet::RESPONSE, response);
proto.handle_packet(&expected, &1, packet::GET_RECEIPTS, &request_body); proto.handle_packet(&expected, &1, packet::REQUEST, &request_body);
} }
#[test] #[test]
@ -397,8 +427,9 @@ fn get_state_proofs() {
let capabilities = capabilities(); let capabilities = capabilities();
let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); let (provider, proto) = setup(flow_params.clone(), capabilities.clone());
let provider = TestProvider(provider);
let cur_status = status(provider.client.chain_info()); let cur_status = status(provider.0.client.chain_info());
{ {
let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params));
@ -407,40 +438,45 @@ fn get_state_proofs() {
} }
let req_id = 112; let req_id = 112;
let key1 = U256::from(11223344).into(); let key1: H256 = U256::from(11223344).into();
let key2 = U256::from(99988887).into(); let key2: H256 = U256::from(99988887).into();
let request = Request::StateProofs (request::StateProofs { let mut builder = RequestBuilder::default();
requests: vec![ builder.push(Request::Account(IncompleteAccountRequest {
request::StateProof { block: H256::default(), key1: key1, key2: None, from_level: 0 }, block_hash: H256::default().into(),
request::StateProof { block: H256::default(), key1: key1, key2: Some(key2), from_level: 0}, address_hash: key1.into(),
] })).unwrap();
}); builder.push(Request::Storage(IncompleteStorageRequest {
block_hash: H256::default().into(),
address_hash: key1.into(),
key_hash: key2.into(),
})).unwrap();
let request_body = encode_request(&request, req_id); let requests = builder.build();
let request_body = make_packet(req_id, &requests);
let response = { let response = {
let proofs = vec![ let responses = vec![
{ let mut stream = RlpStream::new_list(2); stream.append(&key1).append_empty_data(); vec![stream.out()] }, Response::Account(provider.account_proof(CompleteAccountRequest {
vec![::util::sha3::SHA3_NULL_RLP.to_vec()], block_hash: H256::default(),
address_hash: key1,
}).unwrap()),
Response::Storage(provider.storage_proof(CompleteStorageRequest {
block_hash: H256::default(),
address_hash: key1,
key_hash: key2,
}).unwrap()),
]; ];
let new_creds = *flow_params.limit() - flow_params.compute_cost(request::Kind::StateProofs, 2); let new_creds = *flow_params.limit() - flow_params.compute_cost_multi(requests.requests());
let mut response_stream = RlpStream::new_list(3); let mut response_stream = RlpStream::new_list(3);
response_stream.append(&req_id).append(&new_creds).append_list(&responses);
response_stream.append(&req_id).append(&new_creds).begin_list(2);
for proof in proofs {
response_stream.begin_list(proof.len());
for node in proof {
response_stream.append_raw(&node, 1);
}
}
response_stream.out() response_stream.out()
}; };
let expected = Expect::Respond(packet::PROOFS, response); let expected = Expect::Respond(packet::RESPONSE, response);
proto.handle_packet(&expected, &1, packet::GET_PROOFS, &request_body); proto.handle_packet(&expected, &1, packet::REQUEST, &request_body);
} }
#[test] #[test]
@ -459,37 +495,31 @@ fn get_contract_code() {
} }
let req_id = 112; let req_id = 112;
let key1 = U256::from(11223344).into(); let key1: H256 = U256::from(11223344).into();
let key2 = U256::from(99988887).into(); let key2: H256 = U256::from(99988887).into();
let request = Request::Codes (request::ContractCodes { let request = Request::Code(IncompleteCodeRequest {
code_requests: vec![ block_hash: key1.into(),
request::ContractCode { block_hash: H256::default(), account_key: key1 }, code_hash: key2.into(),
request::ContractCode { block_hash: H256::default(), account_key: key2 },
],
}); });
let request_body = encode_request(&request, req_id); let requests = encode_single(request.clone());
let request_body = make_packet(req_id, &requests);
let response = { let response = {
let codes: Vec<Vec<_>> = vec![ let response = vec![Response::Code(CodeResponse {
key1.iter().chain(key1.iter()).cloned().collect(), code: key1.iter().chain(key2.iter()).cloned().collect(),
key2.iter().chain(key2.iter()).cloned().collect(), })];
];
let new_creds = *flow_params.limit() - flow_params.compute_cost(request::Kind::Codes, 2); let new_creds = *flow_params.limit() - flow_params.compute_cost_multi(requests.requests());
let mut response_stream = RlpStream::new_list(3); let mut response_stream = RlpStream::new_list(3);
response_stream.append(&req_id).append(&new_creds).begin_list(2); response_stream.append(&req_id).append(&new_creds).append_list(&response);
for code in codes {
response_stream.append(&code);
}
response_stream.out() response_stream.out()
}; };
let expected = Expect::Respond(packet::CONTRACT_CODES, response); let expected = Expect::Respond(packet::RESPONSE, response);
proto.handle_packet(&expected, &1, packet::GET_CONTRACT_CODES, &request_body); proto.handle_packet(&expected, &1, packet::REQUEST, &request_body);
} }
#[test] #[test]
@ -508,8 +538,8 @@ fn proof_of_execution() {
} }
let req_id = 112; let req_id = 112;
let mut request = Request::TransactionProof (request::TransactionProof { let mut request = Request::Execution(request::IncompleteExecutionRequest {
at: H256::default(), block_hash: H256::default().into(),
from: Address::default(), from: Address::default(),
action: Action::Call(Address::default()), action: Action::Call(Address::default()),
gas: 100.into(), gas: 100.into(),
@ -519,9 +549,11 @@ fn proof_of_execution() {
}); });
// first: a valid amount to request execution of. // first: a valid amount to request execution of.
let request_body = encode_request(&request, req_id); let requests = encode_single(request.clone());
let request_body = make_packet(req_id, &requests);
let response = { let response = {
let new_creds = *flow_params.limit() - flow_params.compute_cost(request::Kind::TransactionProof, 100); let new_creds = *flow_params.limit() - flow_params.compute_cost_multi(requests.requests());
let mut response_stream = RlpStream::new_list(3); let mut response_stream = RlpStream::new_list(3);
response_stream.append(&req_id).append(&new_creds).begin_list(0); response_stream.append(&req_id).append(&new_creds).begin_list(0);
@ -529,17 +561,19 @@ fn proof_of_execution() {
response_stream.out() response_stream.out()
}; };
let expected = Expect::Respond(packet::TRANSACTION_PROOF, response); let expected = Expect::Respond(packet::RESPONSE, response);
proto.handle_packet(&expected, &1, packet::GET_TRANSACTION_PROOF, &request_body); proto.handle_packet(&expected, &1, packet::REQUEST, &request_body);
// next: way too much requested gas. // next: way too much requested gas.
if let Request::TransactionProof(ref mut req) = request { if let Request::Execution(ref mut req) = request {
req.gas = 100_000_000.into(); req.gas = 100_000_000.into();
} }
let req_id = 113; let req_id = 113;
let request_body = encode_request(&request, req_id); let requests = encode_single(request.clone());
let request_body = make_packet(req_id, &requests);
let expected = Expect::Punish(1); let expected = Expect::Punish(1);
proto.handle_packet(&expected, &1, packet::GET_TRANSACTION_PROOF, &request_body); proto.handle_packet(&expected, &1, packet::REQUEST, &request_body);
} }
#[test] #[test]
@ -554,12 +588,13 @@ fn id_guard() {
let req_id_1 = ReqId(5143); let req_id_1 = ReqId(5143);
let req_id_2 = ReqId(1111); let req_id_2 = ReqId(1111);
let req = Request::Headers(request::Headers {
start: 5u64.into(), let req = encode_single(Request::Headers(IncompleteHeadersRequest {
start: HashOrNumber::Number(5u64).into(),
max: 100, max: 100,
skip: 0, skip: 0,
reverse: false, reverse: false,
}); }));
let peer_id = 9876; let peer_id = 9876;
@ -579,15 +614,15 @@ fn id_guard() {
failed_requests: Vec::new(), failed_requests: Vec::new(),
})); }));
// first, supply wrong request type. // first, malformed responses.
{ {
let mut stream = RlpStream::new_list(3); let mut stream = RlpStream::new_list(3);
stream.append(&req_id_1.0); stream.append(&req_id_1.0);
stream.append(&4_000_000usize); stream.append(&4_000_000usize);
stream.begin_list(0); stream.begin_list(2).append(&125usize).append(&3usize);
let packet = stream.out(); let packet = stream.out();
assert!(proto.block_bodies(&peer_id, &Expect::Nothing, UntrustedRlp::new(&packet)).is_err()); assert!(proto.response(&peer_id, &Expect::Nothing, UntrustedRlp::new(&packet)).is_err());
} }
// next, do an unexpected response. // next, do an unexpected response.
@ -598,7 +633,7 @@ fn id_guard() {
stream.begin_list(0); stream.begin_list(0);
let packet = stream.out(); let packet = stream.out();
assert!(proto.receipts(&peer_id, &Expect::Nothing, UntrustedRlp::new(&packet)).is_err()); assert!(proto.response(&peer_id, &Expect::Nothing, UntrustedRlp::new(&packet)).is_err());
} }
// lastly, do a valid (but empty) response. // lastly, do a valid (but empty) response.
@ -609,7 +644,7 @@ fn id_guard() {
stream.begin_list(0); stream.begin_list(0);
let packet = stream.out(); let packet = stream.out();
assert!(proto.block_headers(&peer_id, &Expect::Nothing, UntrustedRlp::new(&packet)).is_ok()); assert!(proto.response(&peer_id, &Expect::Nothing, UntrustedRlp::new(&packet)).is_ok());
} }
let peers = proto.peers.read(); let peers = proto.peers.read();

View File

@ -34,12 +34,12 @@ use futures::{Async, Poll, Future};
use futures::sync::oneshot::{self, Sender, Receiver}; use futures::sync::oneshot::{self, Sender, Receiver};
use network::PeerId; use network::PeerId;
use rlp::RlpStream; use rlp::RlpStream;
use util::{Bytes, DBValue, RwLock, Mutex, U256}; use util::{Bytes, RwLock, Mutex, U256, H256};
use util::sha3::{SHA3_NULL_RLP, SHA3_EMPTY_LIST_RLP}; use util::sha3::{SHA3_NULL_RLP, SHA3_EMPTY_LIST_RLP};
use net::{Handler, Status, Capabilities, Announcement, EventContext, BasicContext, ReqId}; use net::{Handler, Status, Capabilities, Announcement, EventContext, BasicContext, ReqId};
use cache::Cache; use cache::Cache;
use types::les_request::{self as les_request, Request as LesRequest}; use request::{self as basic_request, Request as NetworkRequest, Response as NetworkResponse};
pub mod request; pub mod request;
@ -49,24 +49,85 @@ struct Peer {
capabilities: Capabilities, capabilities: Capabilities,
} }
impl Peer {
// Whether a given peer can handle a specific request.
fn can_handle(&self, pending: &Pending) -> bool {
match *pending {
Pending::HeaderProof(ref req, _) =>
self.capabilities.serve_headers && self.status.head_num > req.num(),
Pending::HeaderByHash(_, _) => self.capabilities.serve_headers,
Pending::Block(ref req, _) =>
self.capabilities.serve_chain_since.as_ref().map_or(false, |x| *x >= req.header.number()),
Pending::BlockReceipts(ref req, _) =>
self.capabilities.serve_chain_since.as_ref().map_or(false, |x| *x >= req.0.number()),
Pending::Account(ref req, _) =>
self.capabilities.serve_state_since.as_ref().map_or(false, |x| *x >= req.header.number()),
Pending::Code(ref req, _) =>
self.capabilities.serve_state_since.as_ref().map_or(false, |x| *x >= req.block_id.1),
Pending::TxProof(ref req, _) =>
self.capabilities.serve_state_since.as_ref().map_or(false, |x| *x >= req.header.number()),
}
}
}
// Which portions of a CHT proof should be sent. // Which portions of a CHT proof should be sent.
enum ChtProofSender { enum ChtProofSender {
Both(Sender<(encoded::Header, U256)>), Both(Sender<(H256, U256)>),
Header(Sender<encoded::Header>), Hash(Sender<H256>),
ChainScore(Sender<U256>), ChainScore(Sender<U256>),
} }
// Attempted request info and sender to put received value. // Attempted request info and sender to put received value.
enum Pending { enum Pending {
HeaderByNumber(request::HeaderByNumber, ChtProofSender), HeaderProof(request::HeaderProof, ChtProofSender),
HeaderByHash(request::HeaderByHash, Sender<encoded::Header>), HeaderByHash(request::HeaderByHash, Sender<encoded::Header>),
Block(request::Body, Sender<encoded::Block>), Block(request::Body, Sender<encoded::Block>),
BlockReceipts(request::BlockReceipts, Sender<Vec<Receipt>>), BlockReceipts(request::BlockReceipts, Sender<Vec<Receipt>>),
Account(request::Account, Sender<BasicAccount>), Account(request::Account, Sender<Option<BasicAccount>>),
Code(request::Code, Sender<Bytes>), Code(request::Code, Sender<Bytes>),
TxProof(request::TransactionProof, Sender<Result<Executed, ExecutionError>>), TxProof(request::TransactionProof, Sender<Result<Executed, ExecutionError>>),
} }
impl Pending {
// Create a network request.
fn make_request(&self) -> NetworkRequest {
match *self {
Pending::HeaderByHash(ref req, _) => NetworkRequest::Headers(basic_request::IncompleteHeadersRequest {
start: basic_request::HashOrNumber::Hash(req.0).into(),
skip: 0,
max: 1,
reverse: false,
}),
Pending::HeaderProof(ref req, _) => NetworkRequest::HeaderProof(basic_request::IncompleteHeaderProofRequest {
num: req.num().into(),
}),
Pending::Block(ref req, _) => NetworkRequest::Body(basic_request::IncompleteBodyRequest {
hash: req.hash.into(),
}),
Pending::BlockReceipts(ref req, _) => NetworkRequest::Receipts(basic_request::IncompleteReceiptsRequest {
hash: req.0.hash().into(),
}),
Pending::Account(ref req, _) => NetworkRequest::Account(basic_request::IncompleteAccountRequest {
block_hash: req.header.hash().into(),
address_hash: ::util::Hashable::sha3(&req.address).into(),
}),
Pending::Code(ref req, _) => NetworkRequest::Code(basic_request::IncompleteCodeRequest {
block_hash: req.block_id.0.into(),
code_hash: req.code_hash.into(),
}),
Pending::TxProof(ref req, _) => NetworkRequest::Execution(basic_request::IncompleteExecutionRequest {
block_hash: req.header.hash().into(),
from: req.tx.sender(),
gas: req.tx.gas,
gas_price: req.tx.gas_price,
action: req.tx.action.clone(),
value: req.tx.value,
data: req.tx.data.clone(),
}),
}
}
}
/// On demand request service. See module docs for more details. /// On demand request service. See module docs for more details.
/// Accumulates info about all peers' capabilities and dispatches /// Accumulates info about all peers' capabilities and dispatches
/// requests to them accordingly. /// requests to them accordingly.
@ -90,25 +151,25 @@ impl OnDemand {
} }
} }
/// Request a header by block number and CHT root hash. /// Request a header's hash by block number and CHT root hash.
/// Returns the header. /// Returns the hash.
pub fn header_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber) -> Receiver<encoded::Header> { pub fn hash_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> Receiver<H256> {
let (sender, receiver) = oneshot::channel(); let (sender, receiver) = oneshot::channel();
let cached = { let cached = {
let mut cache = self.cache.lock(); let mut cache = self.cache.lock();
cache.block_hash(&req.num()).and_then(|hash| cache.block_header(&hash)) cache.block_hash(&req.num())
}; };
match cached { match cached {
Some(hdr) => sender.send(hdr).expect(RECEIVER_IN_SCOPE), Some(hash) => sender.send(hash).expect(RECEIVER_IN_SCOPE),
None => self.dispatch_header_by_number(ctx, req, ChtProofSender::Header(sender)), None => self.dispatch(ctx, Pending::HeaderProof(req, ChtProofSender::Hash(sender))),
} }
receiver receiver
} }
/// Request a canonical block's chain score. /// Request a canonical block's chain score.
/// Returns the chain score. /// Returns the chain score.
pub fn chain_score_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber) -> Receiver<U256> { pub fn chain_score_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> Receiver<U256> {
let (sender, receiver) = oneshot::channel(); let (sender, receiver) = oneshot::channel();
let cached = { let cached = {
let mut cache = self.cache.lock(); let mut cache = self.cache.lock();
@ -117,71 +178,33 @@ impl OnDemand {
match cached { match cached {
Some(score) => sender.send(score).expect(RECEIVER_IN_SCOPE), Some(score) => sender.send(score).expect(RECEIVER_IN_SCOPE),
None => self.dispatch_header_by_number(ctx, req, ChtProofSender::ChainScore(sender)), None => self.dispatch(ctx, Pending::HeaderProof(req, ChtProofSender::ChainScore(sender))),
} }
receiver receiver
} }
/// Request a canonical block's chain score. /// Request a canonical block's hash and chain score by number.
/// Returns the header and chain score. /// Returns the hash and chain score.
pub fn header_and_score_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber) -> Receiver<(encoded::Header, U256)> { pub fn hash_and_score_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> Receiver<(H256, U256)> {
let (sender, receiver) = oneshot::channel(); let (sender, receiver) = oneshot::channel();
let cached = { let cached = {
let mut cache = self.cache.lock(); let mut cache = self.cache.lock();
let hash = cache.block_hash(&req.num()); let hash = cache.block_hash(&req.num());
( (
hash.clone().and_then(|hash| cache.block_header(&hash)), hash.clone(),
hash.and_then(|hash| cache.chain_score(&hash)), hash.and_then(|hash| cache.chain_score(&hash)),
) )
}; };
match cached { match cached {
(Some(hdr), Some(score)) => sender.send((hdr, score)).expect(RECEIVER_IN_SCOPE), (Some(hash), Some(score)) => sender.send((hash, score)).expect(RECEIVER_IN_SCOPE),
_ => self.dispatch_header_by_number(ctx, req, ChtProofSender::Both(sender)), _ => self.dispatch(ctx, Pending::HeaderProof(req, ChtProofSender::Both(sender))),
} }
receiver receiver
} }
// dispatch the request, completing the request if no peers available.
fn dispatch_header_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber, sender: ChtProofSender) {
let num = req.num();
let cht_num = req.cht_num();
let les_req = LesRequest::HeaderProofs(les_request::HeaderProofs {
requests: vec![les_request::HeaderProof {
cht_number: cht_num,
block_number: num,
from_level: 0,
}],
});
let pending = Pending::HeaderByNumber(req, sender);
// we're looking for a peer with serveHeaders who's far enough along in the
// chain.
for (id, peer) in self.peers.read().iter() {
if peer.capabilities.serve_headers && peer.status.head_num >= num {
match ctx.request_from(*id, les_req.clone()) {
Ok(req_id) => {
trace!(target: "on_demand", "Assigning request to peer {}", id);
self.pending_requests.write().insert(
req_id,
pending,
);
return
},
Err(e) =>
trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e),
}
}
}
trace!(target: "on_demand", "No suitable peer for request");
self.orphaned_requests.write().push(pending)
}
/// Request a header by hash. This is less accurate than by-number because we don't know /// Request a header by hash. This is less accurate than by-number because we don't know
/// where in the chain this header lies, and therefore can't find a peer who is supposed to have /// where in the chain this header lies, and therefore can't find a peer who is supposed to have
/// it as easily. /// it as easily.
@ -189,50 +212,11 @@ impl OnDemand {
let (sender, receiver) = oneshot::channel(); let (sender, receiver) = oneshot::channel();
match self.cache.lock().block_header(&req.0) { match self.cache.lock().block_header(&req.0) {
Some(hdr) => sender.send(hdr).expect(RECEIVER_IN_SCOPE), Some(hdr) => sender.send(hdr).expect(RECEIVER_IN_SCOPE),
None => self.dispatch_header_by_hash(ctx, req, sender), None => self.dispatch(ctx, Pending::HeaderByHash(req, sender)),
} }
receiver receiver
} }
fn dispatch_header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash, sender: Sender<encoded::Header>) {
let les_req = LesRequest::Headers(les_request::Headers {
start: req.0.into(),
max: 1,
skip: 0,
reverse: false,
});
// all we've got is a hash, so we'll just guess at peers who might have
// it randomly.
let mut potential_peers = self.peers.read().iter()
.filter(|&(_, peer)| peer.capabilities.serve_headers)
.map(|(id, _)| *id)
.collect::<Vec<_>>();
let mut rng = ::rand::thread_rng();
::rand::Rng::shuffle(&mut rng, &mut potential_peers);
let pending = Pending::HeaderByHash(req, sender);
for id in potential_peers {
match ctx.request_from(id, les_req.clone()) {
Ok(req_id) => {
trace!(target: "on_demand", "Assigning request to peer {}", id);
self.pending_requests.write().insert(
req_id,
pending,
);
return
}
Err(e) =>
trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e),
}
}
trace!(target: "on_demand", "No suitable peer for request");
self.orphaned_requests.write().push(pending)
}
/// Request a block, given its header. Block bodies are requestable by hash only, /// Request a block, given its header. Block bodies are requestable by hash only,
/// and the header is required anyway to verify and complete the block body /// and the header is required anyway to verify and complete the block body
/// -- this just doesn't obscure the network query. /// -- this just doesn't obscure the network query.
@ -246,7 +230,7 @@ impl OnDemand {
stream.begin_list(0); stream.begin_list(0);
stream.begin_list(0); stream.begin_list(0);
sender.send(encoded::Block::new(stream.out())).expect(RECEIVER_IN_SCOPE) sender.send(encoded::Block::new(stream.out())).expect(RECEIVER_IN_SCOPE);
} else { } else {
match self.cache.lock().block_body(&req.hash) { match self.cache.lock().block_body(&req.hash) {
Some(body) => { Some(body) => {
@ -254,43 +238,14 @@ impl OnDemand {
stream.append_raw(&req.header.into_inner(), 1); stream.append_raw(&req.header.into_inner(), 1);
stream.append_raw(&body.into_inner(), 2); stream.append_raw(&body.into_inner(), 2);
sender.complete(encoded::Block::new(stream.out())); sender.send(encoded::Block::new(stream.out())).expect(RECEIVER_IN_SCOPE);
} }
None => self.dispatch_block(ctx, req, sender), None => self.dispatch(ctx, Pending::Block(req, sender)),
} }
} }
receiver receiver
} }
fn dispatch_block(&self, ctx: &BasicContext, req: request::Body, sender: Sender<encoded::Block>) {
let num = req.header.number();
let les_req = LesRequest::Bodies(les_request::Bodies {
block_hashes: vec![req.hash],
});
let pending = Pending::Block(req, sender);
// we're looking for a peer with serveChainSince(num)
for (id, peer) in self.peers.read().iter() {
if peer.capabilities.serve_chain_since.as_ref().map_or(false, |x| *x >= num) {
match ctx.request_from(*id, les_req.clone()) {
Ok(req_id) => {
trace!(target: "on_demand", "Assigning request to peer {}", id);
self.pending_requests.write().insert(
req_id,
pending,
);
return
}
Err(e) =>
trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e),
}
}
}
trace!(target: "on_demand", "No suitable peer for request");
self.orphaned_requests.write().push(pending)
}
/// Request the receipts for a block. The header serves two purposes: /// Request the receipts for a block. The header serves two purposes:
/// provide the block hash to fetch receipts for, and for verification of the receipts root. /// provide the block hash to fetch receipts for, and for verification of the receipts root.
pub fn block_receipts(&self, ctx: &BasicContext, req: request::BlockReceipts) -> Receiver<Vec<Receipt>> { pub fn block_receipts(&self, ctx: &BasicContext, req: request::BlockReceipts) -> Receiver<Vec<Receipt>> {
@ -298,88 +253,25 @@ impl OnDemand {
// fast path for empty receipts. // fast path for empty receipts.
if req.0.receipts_root() == SHA3_NULL_RLP { if req.0.receipts_root() == SHA3_NULL_RLP {
sender.send(Vec::new()).expect(RECEIVER_IN_SCOPE) sender.send(Vec::new()).expect(RECEIVER_IN_SCOPE);
} else { } else {
match self.cache.lock().block_receipts(&req.0.hash()) { match self.cache.lock().block_receipts(&req.0.hash()) {
Some(receipts) => sender.send(receipts).expect(RECEIVER_IN_SCOPE), Some(receipts) => sender.send(receipts).expect(RECEIVER_IN_SCOPE),
None => self.dispatch_block_receipts(ctx, req, sender), None => self.dispatch(ctx, Pending::BlockReceipts(req, sender)),
} }
} }
receiver receiver
} }
fn dispatch_block_receipts(&self, ctx: &BasicContext, req: request::BlockReceipts, sender: Sender<Vec<Receipt>>) {
let num = req.0.number();
let les_req = LesRequest::Receipts(les_request::Receipts {
block_hashes: vec![req.0.hash()],
});
let pending = Pending::BlockReceipts(req, sender);
// we're looking for a peer with serveChainSince(num)
for (id, peer) in self.peers.read().iter() {
if peer.capabilities.serve_chain_since.as_ref().map_or(false, |x| *x >= num) {
match ctx.request_from(*id, les_req.clone()) {
Ok(req_id) => {
trace!(target: "on_demand", "Assigning request to peer {}", id);
self.pending_requests.write().insert(
req_id,
pending,
);
return
}
Err(e) =>
trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e),
}
}
}
trace!(target: "on_demand", "No suitable peer for request");
self.orphaned_requests.write().push(pending)
}
/// Request an account by address and block header -- which gives a hash to query and a state root /// Request an account by address and block header -- which gives a hash to query and a state root
/// to verify against. /// to verify against.
pub fn account(&self, ctx: &BasicContext, req: request::Account) -> Receiver<BasicAccount> { pub fn account(&self, ctx: &BasicContext, req: request::Account) -> Receiver<Option<BasicAccount>> {
let (sender, receiver) = oneshot::channel(); let (sender, receiver) = oneshot::channel();
self.dispatch_account(ctx, req, sender); self.dispatch(ctx, Pending::Account(req, sender));
receiver receiver
} }
fn dispatch_account(&self, ctx: &BasicContext, req: request::Account, sender: Sender<BasicAccount>) {
let num = req.header.number();
let les_req = LesRequest::StateProofs(les_request::StateProofs {
requests: vec![les_request::StateProof {
block: req.header.hash(),
key1: ::util::Hashable::sha3(&req.address),
key2: None,
from_level: 0,
}],
});
let pending = Pending::Account(req, sender);
// we're looking for a peer with serveStateSince(num)
for (id, peer) in self.peers.read().iter() {
if peer.capabilities.serve_state_since.as_ref().map_or(false, |x| *x >= num) {
match ctx.request_from(*id, les_req.clone()) {
Ok(req_id) => {
trace!(target: "on_demand", "Assigning request to peer {}", id);
self.pending_requests.write().insert(
req_id,
pending,
);
return
}
Err(e) =>
trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e),
}
}
}
trace!(target: "on_demand", "No suitable peer for request");
self.orphaned_requests.write().push(pending)
}
/// Request code by address, known code hash, and block header. /// Request code by address, known code hash, and block header.
pub fn code(&self, ctx: &BasicContext, req: request::Code) -> Receiver<Bytes> { pub fn code(&self, ctx: &BasicContext, req: request::Code) -> Receiver<Bytes> {
let (sender, receiver) = oneshot::channel(); let (sender, receiver) = oneshot::channel();
@ -388,88 +280,50 @@ impl OnDemand {
if req.code_hash == ::util::sha3::SHA3_EMPTY { if req.code_hash == ::util::sha3::SHA3_EMPTY {
sender.send(Vec::new()).expect(RECEIVER_IN_SCOPE) sender.send(Vec::new()).expect(RECEIVER_IN_SCOPE)
} else { } else {
self.dispatch_code(ctx, req, sender); self.dispatch(ctx, Pending::Code(req, sender));
} }
receiver receiver
} }
fn dispatch_code(&self, ctx: &BasicContext, req: request::Code, sender: Sender<Bytes>) {
let num = req.block_id.1;
let les_req = LesRequest::Codes(les_request::ContractCodes {
code_requests: vec![les_request::ContractCode {
block_hash: req.block_id.0,
account_key: ::util::Hashable::sha3(&req.address),
}]
});
let pending = Pending::Code(req, sender);
// we're looking for a peer with serveStateSince(num)
for (id, peer) in self.peers.read().iter() {
if peer.capabilities.serve_state_since.as_ref().map_or(false, |x| *x >= num) {
match ctx.request_from(*id, les_req.clone()) {
Ok(req_id) => {
trace!(target: "on_demand", "Assigning request to peer {}", id);
self.pending_requests.write().insert(
req_id,
pending
);
return
}
Err(e) =>
trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e),
}
}
}
trace!(target: "on_demand", "No suitable peer for request");
self.orphaned_requests.write().push(pending)
}
/// Request proof-of-execution for a transaction. /// Request proof-of-execution for a transaction.
pub fn transaction_proof(&self, ctx: &BasicContext, req: request::TransactionProof) -> Receiver<Result<Executed, ExecutionError>> { pub fn transaction_proof(&self, ctx: &BasicContext, req: request::TransactionProof) -> Receiver<Result<Executed, ExecutionError>> {
let (sender, receiver) = oneshot::channel(); let (sender, receiver) = oneshot::channel();
self.dispatch_transaction_proof(ctx, req, sender); self.dispatch(ctx, Pending::TxProof(req, sender));
receiver receiver
} }
fn dispatch_transaction_proof(&self, ctx: &BasicContext, req: request::TransactionProof, sender: Sender<Result<Executed, ExecutionError>>) { // dispatch the request, with a "suitability" function to filter acceptable peers.
let num = req.header.number(); fn dispatch(&self, ctx: &BasicContext, pending: Pending) {
let les_req = LesRequest::TransactionProof(les_request::TransactionProof { let mut builder = basic_request::RequestBuilder::default();
at: req.header.hash(), builder.push(pending.make_request())
from: req.tx.sender(), .expect("make_request always returns fully complete request; qed");
gas: req.tx.gas,
gas_price: req.tx.gas_price, let complete = builder.build();
action: req.tx.action.clone(),
value: req.tx.value,
data: req.tx.data.clone(),
});
let pending = Pending::TxProof(req, sender);
// we're looking for a peer with serveStateSince(num)
for (id, peer) in self.peers.read().iter() { for (id, peer) in self.peers.read().iter() {
if peer.capabilities.serve_state_since.as_ref().map_or(false, |x| *x >= num) { if !peer.can_handle(&pending) { continue }
match ctx.request_from(*id, les_req.clone()) { match ctx.request_from(*id, complete.clone()) {
Ok(req_id) => { Ok(req_id) => {
trace!(target: "on_demand", "Assigning request to peer {}", id); trace!(target: "on_demand", "Assigning request to peer {}", id);
self.pending_requests.write().insert( self.pending_requests.write().insert(
req_id, req_id,
pending pending,
); );
return return
}
Err(e) =>
trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e),
} }
Err(e) =>
trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e),
} }
} }
trace!(target: "on_demand", "No suitable peer for request"); trace!(target: "on_demand", "No suitable peer for request");
self.orphaned_requests.write().push(pending) self.orphaned_requests.write().push(pending);
} }
// dispatch orphaned requests, and discard those for which the corresponding // dispatch orphaned requests, and discard those for which the corresponding
// receiver has been dropped. // receiver has been dropped.
fn dispatch_orphaned(&self, ctx: &BasicContext) { fn dispatch_orphaned(&self, ctx: &BasicContext) {
@ -499,30 +353,22 @@ impl OnDemand {
let to_dispatch = ::std::mem::replace(&mut *self.orphaned_requests.write(), Vec::new()); let to_dispatch = ::std::mem::replace(&mut *self.orphaned_requests.write(), Vec::new());
for orphaned in to_dispatch { for mut orphaned in to_dispatch {
match orphaned { let hung_up = match orphaned {
Pending::HeaderByNumber(req, mut sender) => { Pending::HeaderProof(_, ref mut sender) => match *sender {
let hangup = match sender {
ChtProofSender::Both(ref mut s) => check_hangup(s), ChtProofSender::Both(ref mut s) => check_hangup(s),
ChtProofSender::Header(ref mut s) => check_hangup(s), ChtProofSender::Hash(ref mut s) => check_hangup(s),
ChtProofSender::ChainScore(ref mut s) => check_hangup(s), ChtProofSender::ChainScore(ref mut s) => check_hangup(s),
}; },
Pending::HeaderByHash(_, ref mut sender) => check_hangup(sender),
Pending::Block(_, ref mut sender) => check_hangup(sender),
Pending::BlockReceipts(_, ref mut sender) => check_hangup(sender),
Pending::Account(_, ref mut sender) => check_hangup(sender),
Pending::Code(_, ref mut sender) => check_hangup(sender),
Pending::TxProof(_, ref mut sender) => check_hangup(sender),
};
if !hangup { self.dispatch_header_by_number(ctx, req, sender) } if !hung_up { self.dispatch(ctx, orphaned) }
}
Pending::HeaderByHash(req, mut sender) =>
if !check_hangup(&mut sender) { self.dispatch_header_by_hash(ctx, req, sender) },
Pending::Block(req, mut sender) =>
if !check_hangup(&mut sender) { self.dispatch_block(ctx, req, sender) },
Pending::BlockReceipts(req, mut sender) =>
if !check_hangup(&mut sender) { self.dispatch_block_receipts(ctx, req, sender) },
Pending::Account(req, mut sender) =>
if !check_hangup(&mut sender) { self.dispatch_account(ctx, req, sender) },
Pending::Code(req, mut sender) =>
if !check_hangup(&mut sender) { self.dispatch_code(ctx, req, sender) },
Pending::TxProof(req, mut sender) =>
if !check_hangup(&mut sender) { self.dispatch_transaction_proof(ctx, req, sender) }
}
} }
} }
} }
@ -560,218 +406,126 @@ impl Handler for OnDemand {
self.dispatch_orphaned(ctx.as_basic()); self.dispatch_orphaned(ctx.as_basic());
} }
fn on_header_proofs(&self, ctx: &EventContext, req_id: ReqId, proofs: &[(Bytes, Vec<Bytes>)]) { fn on_responses(&self, ctx: &EventContext, req_id: ReqId, responses: &[basic_request::Response]) {
let peer = ctx.peer(); let peer = ctx.peer();
let req = match self.pending_requests.write().remove(&req_id) { let req = match self.pending_requests.write().remove(&req_id) {
Some(req) => req, Some(req) => req,
None => return, None => return,
}; };
let response = match responses.get(0) {
Some(response) => response,
None => {
trace!(target: "on_demand", "Ignoring empty response for request {}", req_id);
self.dispatch(ctx.as_basic(), req);
return;
}
};
// handle the response appropriately for the request.
// all branches which do not return early lead to disabling of the peer
// due to misbehavior.
match req { match req {
Pending::HeaderByNumber(req, sender) => { Pending::HeaderProof(req, sender) => {
if let Some(&(ref header, ref proof)) = proofs.get(0) { if let NetworkResponse::HeaderProof(ref response) = *response {
match req.check_response(header, proof) { match req.check_response(&response.proof) {
Ok((header, score)) => { Ok((hash, score)) => {
let mut cache = self.cache.lock(); let mut cache = self.cache.lock();
let hash = header.hash(); cache.insert_block_hash(req.num(), hash);
cache.insert_block_header(hash, header.clone());
cache.insert_block_hash(header.number(), hash);
cache.insert_chain_score(hash, score); cache.insert_chain_score(hash, score);
match sender { match sender {
ChtProofSender::Both(sender) => sender.complete((header, score)), ChtProofSender::Both(sender) => { let _ = sender.send((hash, score)); }
ChtProofSender::Header(sender) => sender.complete(header), ChtProofSender::Hash(sender) => { let _ = sender.send(hash); }
ChtProofSender::ChainScore(sender) => sender.complete(score), ChtProofSender::ChainScore(sender) => { let _ = sender.send(score); }
} }
return return
} }
Err(e) => { Err(e) => warn!("Error handling response for header request: {:?}", e),
warn!("Error handling response for header request: {:?}", e);
ctx.disable_peer(peer);
}
} }
} }
self.dispatch_header_by_number(ctx.as_basic(), req, sender);
} }
_ => panic!("Only header by number request fetches header proofs; qed"),
}
}
fn on_block_headers(&self, ctx: &EventContext, req_id: ReqId, headers: &[Bytes]) {
let peer = ctx.peer();
let req = match self.pending_requests.write().remove(&req_id) {
Some(req) => req,
None => return,
};
match req {
Pending::HeaderByHash(req, sender) => { Pending::HeaderByHash(req, sender) => {
if let Some(ref header) = headers.get(0) { if let NetworkResponse::Headers(ref response) = *response {
match req.check_response(header) { if let Some(header) = response.headers.get(0) {
Ok(header) => { match req.check_response(header) {
self.cache.lock().insert_block_header(req.0, header.clone()); Ok(header) => {
sender.complete(header); self.cache.lock().insert_block_header(req.0, header.clone());
return let _ = sender.send(header);
} return
Err(e) => { }
warn!("Error handling response for header request: {:?}", e); Err(e) => warn!("Error handling response for header request: {:?}", e),
ctx.disable_peer(peer);
} }
} }
} }
self.dispatch_header_by_hash(ctx.as_basic(), req, sender);
} }
_ => panic!("Only header by hash request fetches headers; qed"),
}
}
fn on_block_bodies(&self, ctx: &EventContext, req_id: ReqId, bodies: &[Bytes]) {
let peer = ctx.peer();
let req = match self.pending_requests.write().remove(&req_id) {
Some(req) => req,
None => return,
};
match req {
Pending::Block(req, sender) => { Pending::Block(req, sender) => {
if let Some(ref body) = bodies.get(0) { if let NetworkResponse::Body(ref response) = *response {
match req.check_response(body) { match req.check_response(&response.body) {
Ok(block) => { Ok(block) => {
let body = encoded::Body::new(body.to_vec()); self.cache.lock().insert_block_body(req.hash, response.body.clone());
self.cache.lock().insert_block_body(req.hash, body); let _ = sender.send(block);
sender.complete(block);
return return
} }
Err(e) => { Err(e) => warn!("Error handling response for block request: {:?}", e),
warn!("Error handling response for block request: {:?}", e);
ctx.disable_peer(peer);
}
} }
} }
self.dispatch_block(ctx.as_basic(), req, sender);
} }
_ => panic!("Only block request fetches bodies; qed"),
}
}
fn on_receipts(&self, ctx: &EventContext, req_id: ReqId, receipts: &[Vec<Receipt>]) {
let peer = ctx.peer();
let req = match self.pending_requests.write().remove(&req_id) {
Some(req) => req,
None => return,
};
match req {
Pending::BlockReceipts(req, sender) => { Pending::BlockReceipts(req, sender) => {
if let Some(ref receipts) = receipts.get(0) { if let NetworkResponse::Receipts(ref response) = *response {
match req.check_response(receipts) { match req.check_response(&response.receipts) {
Ok(receipts) => { Ok(receipts) => {
let hash = req.0.hash(); let hash = req.0.hash();
self.cache.lock().insert_block_receipts(hash, receipts.clone()); self.cache.lock().insert_block_receipts(hash, receipts.clone());
sender.complete(receipts); let _ = sender.send(receipts);
return return
} }
Err(e) => { Err(e) => warn!("Error handling response for receipts request: {:?}", e),
warn!("Error handling response for receipts request: {:?}", e);
ctx.disable_peer(peer);
}
} }
} }
self.dispatch_block_receipts(ctx.as_basic(), req, sender);
} }
_ => panic!("Only receipts request fetches receipts; qed"),
}
}
fn on_state_proofs(&self, ctx: &EventContext, req_id: ReqId, proofs: &[Vec<Bytes>]) {
let peer = ctx.peer();
let req = match self.pending_requests.write().remove(&req_id) {
Some(req) => req,
None => return,
};
match req {
Pending::Account(req, sender) => { Pending::Account(req, sender) => {
if let Some(ref proof) = proofs.get(0) { if let NetworkResponse::Account(ref response) = *response {
match req.check_response(proof) { match req.check_response(&response.proof) {
Ok(proof) => { Ok(maybe_account) => {
sender.complete(proof); // TODO: validate against request outputs.
// needs engine + env info as part of request.
let _ = sender.send(maybe_account);
return return
} }
Err(e) => { Err(e) => warn!("Error handling response for state request: {:?}", e),
warn!("Error handling response for state request: {:?}", e);
ctx.disable_peer(peer);
}
} }
} }
self.dispatch_account(ctx.as_basic(), req, sender);
} }
_ => panic!("Only account request fetches state proof; qed"),
}
}
fn on_code(&self, ctx: &EventContext, req_id: ReqId, codes: &[Bytes]) {
let peer = ctx.peer();
let req = match self.pending_requests.write().remove(&req_id) {
Some(req) => req,
None => return,
};
match req {
Pending::Code(req, sender) => { Pending::Code(req, sender) => {
if let Some(code) = codes.get(0) { if let NetworkResponse::Code(ref response) = *response {
match req.check_response(code.as_slice()) { match req.check_response(response.code.as_slice()) {
Ok(()) => { Ok(()) => {
sender.complete(code.clone()); let _ = sender.send(response.code.clone());
return return
} }
Err(e) => { Err(e) => warn!("Error handling response for code request: {:?}", e),
warn!("Error handling response for code request: {:?}", e);
ctx.disable_peer(peer);
}
} }
self.dispatch_code(ctx.as_basic(), req, sender);
} }
} }
_ => panic!("Only code request fetches code; qed"),
}
}
fn on_transaction_proof(&self, ctx: &EventContext, req_id: ReqId, items: &[DBValue]) {
let peer = ctx.peer();
let req = match self.pending_requests.write().remove(&req_id) {
Some(req) => req,
None => return,
};
match req {
Pending::TxProof(req, sender) => { Pending::TxProof(req, sender) => {
match req.check_response(items) { if let NetworkResponse::Execution(ref response) = *response {
ProvedExecution::Complete(executed) => { match req.check_response(&response.items) {
sender.complete(Ok(executed)); ProvedExecution::Complete(executed) => {
return let _ = sender.send(Ok(executed));
} return
ProvedExecution::Failed(err) => { }
sender.complete(Err(err)); ProvedExecution::Failed(err) => {
return let _ = sender.send(Err(err));
} return
ProvedExecution::BadProof => { }
warn!("Error handling response for transaction proof request"); ProvedExecution::BadProof => warn!("Error handling response for transaction proof request"),
ctx.disable_peer(peer);
} }
} }
self.dispatch_transaction_proof(ctx.as_basic(), req, sender);
} }
_ => panic!("Only transaction proof request dispatches transaction proof requests; qed"),
} }
ctx.disable_peer(peer);
} }
fn tick(&self, ctx: &BasicContext) { fn tick(&self, ctx: &BasicContext) {
@ -787,7 +541,7 @@ mod tests {
use cache::Cache; use cache::Cache;
use net::{Announcement, BasicContext, ReqId, Error as LesError}; use net::{Announcement, BasicContext, ReqId, Error as LesError};
use request::{Request as LesRequest, Kind as LesRequestKind}; use request::Requests;
use network::{PeerId, NodeId}; use network::{PeerId, NodeId};
use time::Duration; use time::Duration;
@ -797,11 +551,10 @@ mod tests {
impl BasicContext for FakeContext { impl BasicContext for FakeContext {
fn persistent_peer_id(&self, _: PeerId) -> Option<NodeId> { None } fn persistent_peer_id(&self, _: PeerId) -> Option<NodeId> { None }
fn request_from(&self, _: PeerId, _: LesRequest) -> Result<ReqId, LesError> { fn request_from(&self, _: PeerId, _: Requests) -> Result<ReqId, LesError> {
unimplemented!() unimplemented!()
} }
fn make_announcement(&self, _: Announcement) { } fn make_announcement(&self, _: Announcement) { }
fn max_requests(&self, _: PeerId, _: LesRequestKind) -> usize { 0 }
fn disconnect_peer(&self, _: PeerId) { } fn disconnect_peer(&self, _: PeerId) { }
fn disable_peer(&self, _: PeerId) { } fn disable_peer(&self, _: PeerId) { }
} }

View File

@ -61,9 +61,9 @@ impl From<Box<TrieError>> for Error {
} }
} }
/// Request for a header by number. /// Request for header proof by number
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub struct HeaderByNumber { pub struct HeaderProof {
/// The header's number. /// The header's number.
num: u64, num: u64,
/// The cht number for the given block number. /// The cht number for the given block number.
@ -72,11 +72,11 @@ pub struct HeaderByNumber {
cht_root: H256, cht_root: H256,
} }
impl HeaderByNumber { impl HeaderProof {
/// Construct a new header-by-number request. Fails if the given number is 0. /// Construct a new header-by-number request. Fails if the given number is 0.
/// Provide the expected CHT root to compare against. /// Provide the expected CHT root to compare against.
pub fn new(num: u64, cht_root: H256) -> Option<Self> { pub fn new(num: u64, cht_root: H256) -> Option<Self> {
::cht::block_to_cht_number(num).map(|cht_num| HeaderByNumber { ::cht::block_to_cht_number(num).map(|cht_num| HeaderProof {
num: num, num: num,
cht_num: cht_num, cht_num: cht_num,
cht_root: cht_root, cht_root: cht_root,
@ -92,18 +92,11 @@ impl HeaderByNumber {
/// Access the expected CHT root. /// Access the expected CHT root.
pub fn cht_root(&self) -> H256 { self.cht_root } pub fn cht_root(&self) -> H256 { self.cht_root }
/// Check a response with a header and cht proof. /// Check a response with a CHT proof, get a hash and total difficulty back.
pub fn check_response(&self, header: &[u8], proof: &[Bytes]) -> Result<(encoded::Header, U256), Error> { pub fn check_response(&self, proof: &[Bytes]) -> Result<(H256, U256), Error> {
let (expected_hash, td) = match ::cht::check_proof(proof, self.num, self.cht_root) { match ::cht::check_proof(proof, self.num, self.cht_root) {
Some((expected_hash, td)) => (expected_hash, td), Some((expected_hash, td)) => Ok((expected_hash, td)),
None => return Err(Error::BadProof), None => Err(Error::BadProof),
};
// and compare the hash to the found header.
let found_hash = header.sha3();
match expected_hash == found_hash {
true => Ok((encoded::Header::new(header.to_vec()), td)),
false => Err(Error::WrongHash(expected_hash, found_hash)),
} }
} }
} }
@ -114,10 +107,10 @@ pub struct HeaderByHash(pub H256);
impl HeaderByHash { impl HeaderByHash {
/// Check a response for the header. /// Check a response for the header.
pub fn check_response(&self, header: &[u8]) -> Result<encoded::Header, Error> { pub fn check_response(&self, header: &encoded::Header) -> Result<encoded::Header, Error> {
let hash = header.sha3(); let hash = header.sha3();
match hash == self.0 { match hash == self.0 {
true => Ok(encoded::Header::new(header.to_vec())), true => Ok(header.clone()),
false => Err(Error::WrongHash(self.0, hash)), false => Err(Error::WrongHash(self.0, hash)),
} }
} }
@ -143,16 +136,14 @@ impl Body {
} }
/// Check a response for this block body. /// Check a response for this block body.
pub fn check_response(&self, body: &[u8]) -> Result<encoded::Block, Error> { pub fn check_response(&self, body: &encoded::Body) -> Result<encoded::Block, Error> {
let body_view = UntrustedRlp::new(&body);
// check the integrity of the the body against the header // check the integrity of the the body against the header
let tx_root = ::util::triehash::ordered_trie_root(body_view.at(0)?.iter().map(|r| r.as_raw().to_vec())); let tx_root = ::util::triehash::ordered_trie_root(body.rlp().at(0).iter().map(|r| r.as_raw().to_vec()));
if tx_root != self.header.transactions_root() { if tx_root != self.header.transactions_root() {
return Err(Error::WrongTrieRoot(self.header.transactions_root(), tx_root)); return Err(Error::WrongTrieRoot(self.header.transactions_root(), tx_root));
} }
let uncles_hash = body_view.at(1)?.as_raw().sha3(); let uncles_hash = body.rlp().at(1).as_raw().sha3();
if uncles_hash != self.header.uncles_hash() { if uncles_hash != self.header.uncles_hash() {
return Err(Error::WrongHash(self.header.uncles_hash(), uncles_hash)); return Err(Error::WrongHash(self.header.uncles_hash(), uncles_hash));
} }
@ -160,7 +151,7 @@ impl Body {
// concatenate the header and the body. // concatenate the header and the body.
let mut stream = RlpStream::new_list(3); let mut stream = RlpStream::new_list(3);
stream.append_raw(self.header.rlp().as_raw(), 1); stream.append_raw(self.header.rlp().as_raw(), 1);
stream.append_raw(body, 2); stream.append_raw(&body.rlp().as_raw(), 2);
Ok(encoded::Block::new(stream.out())) Ok(encoded::Block::new(stream.out()))
} }
@ -194,7 +185,7 @@ pub struct Account {
impl Account { impl Account {
/// Check a response with an account against the stored header. /// Check a response with an account against the stored header.
pub fn check_response(&self, proof: &[Bytes]) -> Result<BasicAccount, Error> { pub fn check_response(&self, proof: &[Bytes]) -> Result<Option<BasicAccount>, Error> {
let state_root = self.header.state_root(); let state_root = self.header.state_root();
let mut db = MemoryDB::new(); let mut db = MemoryDB::new();
@ -203,14 +194,14 @@ impl Account {
match TrieDB::new(&db, &state_root).and_then(|t| t.get(&self.address.sha3()))? { match TrieDB::new(&db, &state_root).and_then(|t| t.get(&self.address.sha3()))? {
Some(val) => { Some(val) => {
let rlp = UntrustedRlp::new(&val); let rlp = UntrustedRlp::new(&val);
Ok(BasicAccount { Ok(Some(BasicAccount {
nonce: rlp.val_at(0)?, nonce: rlp.val_at(0)?,
balance: rlp.val_at(1)?, balance: rlp.val_at(1)?,
storage_root: rlp.val_at(2)?, storage_root: rlp.val_at(2)?,
code_hash: rlp.val_at(3)?, code_hash: rlp.val_at(3)?,
}) }))
}, },
None => Err(Error::BadProof) None => Ok(None),
} }
} }
} }
@ -219,8 +210,6 @@ impl Account {
pub struct Code { pub struct Code {
/// Block hash, number pair. /// Block hash, number pair.
pub block_id: (H256, u64), pub block_id: (H256, u64),
/// Address requested.
pub address: Address,
/// Account's code hash. /// Account's code hash.
pub code_hash: H256, pub code_hash: H256,
} }
@ -278,11 +267,11 @@ mod tests {
#[test] #[test]
fn no_invalid_header_by_number() { fn no_invalid_header_by_number() {
assert!(HeaderByNumber::new(0, Default::default()).is_none()) assert!(HeaderProof::new(0, Default::default()).is_none())
} }
#[test] #[test]
fn check_header_by_number() { fn check_header_proof() {
use ::cht; use ::cht;
let test_client = TestBlockChainClient::new(); let test_client = TestBlockChainClient::new();
@ -303,11 +292,9 @@ mod tests {
}; };
let proof = cht.prove(10_000, 0).unwrap().unwrap(); let proof = cht.prove(10_000, 0).unwrap().unwrap();
let req = HeaderByNumber::new(10_000, cht.root()).unwrap(); let req = HeaderProof::new(10_000, cht.root()).unwrap();
let raw_header = test_client.block_header(::ethcore::ids::BlockId::Number(10_000)).unwrap(); assert!(req.check_response(&proof[..]).is_ok());
assert!(req.check_response(&raw_header.into_inner(), &proof[..]).is_ok());
} }
#[test] #[test]
@ -316,9 +303,9 @@ mod tests {
header.set_number(10_000); header.set_number(10_000);
header.set_extra_data(b"test_header".to_vec()); header.set_extra_data(b"test_header".to_vec());
let hash = header.hash(); let hash = header.hash();
let raw_header = ::rlp::encode(&header); let raw_header = encoded::Header::new(::rlp::encode(&header).to_vec());
assert!(HeaderByHash(hash).check_response(&*raw_header).is_ok()) assert!(HeaderByHash(hash).check_response(&raw_header).is_ok())
} }
#[test] #[test]
@ -334,7 +321,8 @@ mod tests {
hash: header.hash(), hash: header.hash(),
}; };
assert!(req.check_response(&*body_stream.drain()).is_ok()) let response = encoded::Body::new(body_stream.drain().to_vec());
assert!(req.check_response(&response).is_ok())
} }
#[test] #[test]
@ -412,7 +400,6 @@ mod tests {
let code = vec![1u8; 256]; let code = vec![1u8; 256];
let req = Code { let req = Code {
block_id: (Default::default(), 2), block_id: (Default::default(), 2),
address: Default::default(),
code_hash: ::util::Hashable::sha3(&code), code_hash: ::util::Hashable::sha3(&code),
}; };

View File

@ -24,22 +24,15 @@ use ethcore::client::{BlockChainClient, ProvingBlockChainClient};
use ethcore::transaction::PendingTransaction; use ethcore::transaction::PendingTransaction;
use ethcore::ids::BlockId; use ethcore::ids::BlockId;
use ethcore::encoded; use ethcore::encoded;
use util::{Bytes, DBValue, RwLock, H256}; use util::{RwLock, H256};
use cht::{self, BlockInfo}; use cht::{self, BlockInfo};
use client::{LightChainClient, AsLightClient}; use client::{LightChainClient, AsLightClient};
use transaction_queue::TransactionQueue; use transaction_queue::TransactionQueue;
use request; use request;
/// Defines the operations that a provider for `LES` must fulfill. /// Defines the operations that a provider for the light subprotocol must fulfill.
///
/// These are defined at [1], but may be subject to change.
/// Requests which can't be fulfilled should return either an empty RLP list
/// or empty vector where appropriate.
///
/// [1]: https://github.com/ethcore/parity/wiki/Light-Ethereum-Subprotocol-(LES)
#[cfg_attr(feature = "ipc", ipc(client_ident="LightProviderClient"))] #[cfg_attr(feature = "ipc", ipc(client_ident="LightProviderClient"))]
pub trait Provider: Send + Sync { pub trait Provider: Send + Sync {
/// Provide current blockchain info. /// Provide current blockchain info.
@ -59,18 +52,18 @@ pub trait Provider: Send + Sync {
/// ///
/// The returned vector may have any length in the range [0, `max`], but the /// The returned vector may have any length in the range [0, `max`], but the
/// results within must adhere to the `skip` and `reverse` parameters. /// results within must adhere to the `skip` and `reverse` parameters.
fn block_headers(&self, req: request::Headers) -> Vec<encoded::Header> { fn block_headers(&self, req: request::CompleteHeadersRequest) -> Option<request::HeadersResponse> {
use request::HashOrNumber; use request::HashOrNumber;
if req.max == 0 { return Vec::new() } if req.max == 0 { return None }
let best_num = self.chain_info().best_block_number; let best_num = self.chain_info().best_block_number;
let start_num = match req.start { let start_num = match req.start {
HashOrNumber::Number(start_num) => start_num, HashOrNumber::Number(start_num) => start_num,
HashOrNumber::Hash(hash) => match self.block_header(BlockId::Hash(hash)) { HashOrNumber::Hash(hash) => match self.block_header(BlockId::Hash(hash)) {
None => { None => {
trace!(target: "les_provider", "Unknown block hash {} requested", hash); trace!(target: "pip_provider", "Unknown block hash {} requested", hash);
return Vec::new(); return None;
} }
Some(header) => { Some(header) => {
let num = header.number(); let num = header.number();
@ -79,7 +72,9 @@ pub trait Provider: Send + Sync {
if req.max == 1 || canon_hash != Some(hash) { if req.max == 1 || canon_hash != Some(hash) {
// Non-canonical header or single header requested. // Non-canonical header or single header requested.
return vec![header]; return Some(::request::HeadersResponse {
headers: vec![header],
})
} }
num num
@ -87,116 +82,50 @@ pub trait Provider: Send + Sync {
} }
}; };
(0u64..req.max as u64) let headers: Vec<_> = (0u64..req.max as u64)
.map(|x: u64| x.saturating_mul(req.skip + 1)) .map(|x: u64| x.saturating_mul(req.skip + 1))
.take_while(|x| if req.reverse { x < &start_num } else { best_num.saturating_sub(start_num) >= *x }) .take_while(|x| if req.reverse { x < &start_num } else { best_num.saturating_sub(start_num) >= *x })
.map(|x| if req.reverse { start_num - x } else { start_num + x }) .map(|x| if req.reverse { start_num - x } else { start_num + x })
.map(|x| self.block_header(BlockId::Number(x))) .map(|x| self.block_header(BlockId::Number(x)))
.take_while(|x| x.is_some()) .take_while(|x| x.is_some())
.flat_map(|x| x) .flat_map(|x| x)
.collect() .collect();
if headers.is_empty() {
None
} else {
Some(::request::HeadersResponse { headers: headers })
}
} }
/// Get a block header by id. /// Get a block header by id.
fn block_header(&self, id: BlockId) -> Option<encoded::Header>; fn block_header(&self, id: BlockId) -> Option<encoded::Header>;
/// Provide as many as possible of the requested blocks (minus the headers) encoded /// Fulfill a block body request.
/// in RLP format. fn block_body(&self, req: request::CompleteBodyRequest) -> Option<request::BodyResponse>;
fn block_bodies(&self, req: request::Bodies) -> Vec<Option<encoded::Body>> {
req.block_hashes.into_iter()
.map(|hash| self.block_body(BlockId::Hash(hash)))
.collect()
}
/// Get a block body by id. /// Fulfill a request for block receipts.
fn block_body(&self, id: BlockId) -> Option<encoded::Body>; fn block_receipts(&self, req: request::CompleteReceiptsRequest) -> Option<request::ReceiptsResponse>;
/// Provide the receipts as many as possible of the requested blocks. /// Get an account proof.
/// Returns a vector of RLP-encoded lists of receipts. fn account_proof(&self, req: request::CompleteAccountRequest) -> Option<request::AccountResponse>;
fn receipts(&self, req: request::Receipts) -> Vec<Bytes> {
req.block_hashes.into_iter()
.map(|hash| self.block_receipts(&hash))
.map(|receipts| receipts.unwrap_or_else(|| ::rlp::EMPTY_LIST_RLP.to_vec()))
.collect()
}
/// Get a block's receipts as an RLP-encoded list by block hash. /// Get a storage proof.
fn block_receipts(&self, hash: &H256) -> Option<Bytes>; fn storage_proof(&self, req: request::CompleteStorageRequest) -> Option<request::StorageResponse>;
/// Provide a set of merkle proofs, as requested. Each request is a /// Provide contract code for the specified (block_hash, code_hash) pair.
/// block hash and request parameters. fn contract_code(&self, req: request::CompleteCodeRequest) -> Option<request::CodeResponse>;
///
/// Returns a vector of RLP-encoded lists satisfying the requests.
fn proofs(&self, req: request::StateProofs) -> Vec<Bytes> {
use rlp::RlpStream;
let mut results = Vec::with_capacity(req.requests.len());
for request in req.requests {
let proof = self.state_proof(request);
let mut stream = RlpStream::new_list(proof.len());
for node in proof {
stream.append_raw(&node, 1);
}
results.push(stream.out());
}
results
}
/// Get a state proof from a request. Each proof should be a vector
/// of rlp-encoded trie nodes, in ascending order by distance from the root.
fn state_proof(&self, req: request::StateProof) -> Vec<Bytes>;
/// Provide contract code for the specified (block_hash, account_hash) pairs.
/// Each item in the resulting vector is either the raw bytecode or empty.
fn contract_codes(&self, req: request::ContractCodes) -> Vec<Bytes> {
req.code_requests.into_iter()
.map(|req| self.contract_code(req))
.collect()
}
/// Get contract code by request. Either the raw bytecode or empty.
fn contract_code(&self, req: request::ContractCode) -> Bytes;
/// Provide header proofs from the Canonical Hash Tries as well as the headers
/// they correspond to -- each element in the returned vector is a 2-tuple.
/// The first element is a block header and the second a merkle proof of
/// the header in a requested CHT.
fn header_proofs(&self, req: request::HeaderProofs) -> Vec<Bytes> {
use rlp::{self, RlpStream};
req.requests.into_iter()
.map(|req| self.header_proof(req))
.map(|maybe_proof| match maybe_proof {
None => rlp::EMPTY_LIST_RLP.to_vec(),
Some((header, proof)) => {
let mut stream = RlpStream::new_list(2);
stream.append_raw(&header.into_inner(), 1).begin_list(proof.len());
for node in proof {
stream.append_raw(&node, 1);
}
stream.out()
}
})
.collect()
}
/// Provide a header proof from a given Canonical Hash Trie as well as the /// Provide a header proof from a given Canonical Hash Trie as well as the
/// corresponding header. The first element is the block header and the /// corresponding header.
/// second is a merkle proof of the CHT. fn header_proof(&self, req: request::CompleteHeaderProofRequest) -> Option<request::HeaderProofResponse>;
fn header_proof(&self, req: request::HeaderProof) -> Option<(encoded::Header, Vec<Bytes>)>;
/// Provide pending transactions. /// Provide pending transactions.
fn ready_transactions(&self) -> Vec<PendingTransaction>; fn ready_transactions(&self) -> Vec<PendingTransaction>;
/// Provide a proof-of-execution for the given transaction proof request. /// Provide a proof-of-execution for the given transaction proof request.
/// Returns a vector of all state items necessary to execute the transaction. /// Returns a vector of all state items necessary to execute the transaction.
fn transaction_proof(&self, req: request::TransactionProof) -> Option<Vec<DBValue>>; fn transaction_proof(&self, req: request::CompleteExecutionRequest) -> Option<request::ExecutionResponse>;
} }
// Implementation of a light client data provider for a client. // Implementation of a light client data provider for a client.
@ -217,32 +146,52 @@ impl<T: ProvingBlockChainClient + ?Sized> Provider for T {
BlockChainClient::block_header(self, id) BlockChainClient::block_header(self, id)
} }
fn block_body(&self, id: BlockId) -> Option<encoded::Body> { fn block_body(&self, req: request::CompleteBodyRequest) -> Option<request::BodyResponse> {
BlockChainClient::block_body(self, id) BlockChainClient::block_body(self, BlockId::Hash(req.hash))
.map(|body| ::request::BodyResponse { body: body })
} }
fn block_receipts(&self, hash: &H256) -> Option<Bytes> { fn block_receipts(&self, req: request::CompleteReceiptsRequest) -> Option<request::ReceiptsResponse> {
BlockChainClient::block_receipts(self, hash) BlockChainClient::block_receipts(self, &req.hash)
.map(|x| ::request::ReceiptsResponse { receipts: ::rlp::decode_list(&x) })
} }
fn state_proof(&self, req: request::StateProof) -> Vec<Bytes> { fn account_proof(&self, req: request::CompleteAccountRequest) -> Option<request::AccountResponse> {
match req.key2 { self.prove_account(req.address_hash, BlockId::Hash(req.block_hash)).map(|(proof, acc)| {
Some(key2) => self.prove_storage(req.key1, key2, req.from_level, BlockId::Hash(req.block)), ::request::AccountResponse {
None => self.prove_account(req.key1, req.from_level, BlockId::Hash(req.block)), proof: proof,
} nonce: acc.nonce,
balance: acc.balance,
code_hash: acc.code_hash,
storage_root: acc.storage_root,
}
})
} }
fn contract_code(&self, req: request::ContractCode) -> Bytes { fn storage_proof(&self, req: request::CompleteStorageRequest) -> Option<request::StorageResponse> {
self.code_by_hash(req.account_key, BlockId::Hash(req.block_hash)) self.prove_storage(req.address_hash, req.key_hash, BlockId::Hash(req.block_hash)).map(|(proof, item) | {
::request::StorageResponse {
proof: proof,
value: item,
}
})
} }
fn header_proof(&self, req: request::HeaderProof) -> Option<(encoded::Header, Vec<Bytes>)> { fn contract_code(&self, req: request::CompleteCodeRequest) -> Option<request::CodeResponse> {
if Some(req.cht_number) != cht::block_to_cht_number(req.block_number) { self.state_data(&req.code_hash)
debug!(target: "les_provider", "Requested CHT number mismatch with block number."); .map(|code| ::request::CodeResponse { code: code })
return None; }
}
let mut needed_hdr = None; fn header_proof(&self, req: request::CompleteHeaderProofRequest) -> Option<request::HeaderProofResponse> {
let cht_number = match cht::block_to_cht_number(req.num) {
Some(cht_num) => cht_num,
None => {
debug!(target: "pip_provider", "Requested CHT proof with invalid block number");
return None;
}
};
let mut needed = None;
// build the CHT, caching the requested header as we pass through it. // build the CHT, caching the requested header as we pass through it.
let cht = { let cht = {
@ -258,8 +207,8 @@ impl<T: ProvingBlockChainClient + ?Sized> Provider for T {
total_difficulty: td, total_difficulty: td,
}; };
if hdr.number() == req.block_number { if hdr.number() == req.num {
needed_hdr = Some(hdr); needed = Some((hdr, td));
} }
Some(info) Some(info)
@ -268,29 +217,33 @@ impl<T: ProvingBlockChainClient + ?Sized> Provider for T {
} }
}; };
match cht::build(req.cht_number, block_info) { match cht::build(cht_number, block_info) {
Some(cht) => cht, Some(cht) => cht,
None => return None, // incomplete CHT. None => return None, // incomplete CHT.
} }
}; };
let needed_hdr = needed_hdr.expect("`needed_hdr` always set in loop, number checked before; qed"); let (needed_hdr, needed_td) = needed.expect("`needed` always set in loop, number checked before; qed");
// prove our result. // prove our result.
match cht.prove(req.block_number, req.from_level) { match cht.prove(req.num, 0) {
Ok(Some(proof)) => Some((needed_hdr, proof)), Ok(Some(proof)) => Some(::request::HeaderProofResponse {
proof: proof,
hash: needed_hdr.hash(),
td: needed_td,
}),
Ok(None) => None, Ok(None) => None,
Err(e) => { Err(e) => {
debug!(target: "les_provider", "Error looking up number in freshly-created CHT: {}", e); debug!(target: "pip_provider", "Error looking up number in freshly-created CHT: {}", e);
None None
} }
} }
} }
fn transaction_proof(&self, req: request::TransactionProof) -> Option<Vec<DBValue>> { fn transaction_proof(&self, req: request::CompleteExecutionRequest) -> Option<request::ExecutionResponse> {
use ethcore::transaction::Transaction; use ethcore::transaction::Transaction;
let id = BlockId::Hash(req.at); let id = BlockId::Hash(req.block_hash);
let nonce = match self.nonce(&req.from, id.clone()) { let nonce = match self.nonce(&req.from, id.clone()) {
Some(nonce) => nonce, Some(nonce) => nonce,
None => return None, None => return None,
@ -305,6 +258,7 @@ impl<T: ProvingBlockChainClient + ?Sized> Provider for T {
}.fake_sign(req.from); }.fake_sign(req.from);
self.prove_transaction(transaction, id) self.prove_transaction(transaction, id)
.map(|proof| ::request::ExecutionResponse { items: proof })
} }
fn ready_transactions(&self) -> Vec<PendingTransaction> { fn ready_transactions(&self) -> Vec<PendingTransaction> {
@ -347,27 +301,31 @@ impl<L: AsLightClient + Send + Sync> Provider for LightProvider<L> {
self.client.as_light_client().block_header(id) self.client.as_light_client().block_header(id)
} }
fn block_body(&self, _id: BlockId) -> Option<encoded::Body> { fn block_body(&self, _req: request::CompleteBodyRequest) -> Option<request::BodyResponse> {
None None
} }
fn block_receipts(&self, _hash: &H256) -> Option<Bytes> { fn block_receipts(&self, _req: request::CompleteReceiptsRequest) -> Option<request::ReceiptsResponse> {
None None
} }
fn state_proof(&self, _req: request::StateProof) -> Vec<Bytes> { fn account_proof(&self, _req: request::CompleteAccountRequest) -> Option<request::AccountResponse> {
Vec::new()
}
fn contract_code(&self, _req: request::ContractCode) -> Bytes {
Vec::new()
}
fn header_proof(&self, _req: request::HeaderProof) -> Option<(encoded::Header, Vec<Bytes>)> {
None None
} }
fn transaction_proof(&self, _req: request::TransactionProof) -> Option<Vec<DBValue>> { fn storage_proof(&self, _req: request::CompleteStorageRequest) -> Option<request::StorageResponse> {
None
}
fn contract_code(&self, _req: request::CompleteCodeRequest) -> Option<request::CodeResponse> {
None
}
fn header_proof(&self, _req: request::CompleteHeaderProofRequest) -> Option<request::HeaderProofResponse> {
None
}
fn transaction_proof(&self, _req: request::CompleteExecutionRequest) -> Option<request::ExecutionResponse> {
None None
} }
@ -395,10 +353,8 @@ mod tests {
let client = TestBlockChainClient::new(); let client = TestBlockChainClient::new();
client.add_blocks(2000, EachBlockWith::Nothing); client.add_blocks(2000, EachBlockWith::Nothing);
let req = ::request::HeaderProof { let req = ::request::CompleteHeaderProofRequest {
cht_number: 0, num: 1500,
block_number: 1500,
from_level: 0,
}; };
assert!(client.header_proof(req.clone()).is_none()); assert!(client.header_proof(req.clone()).is_none());

View File

@ -1,228 +0,0 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! LES request types.
use ethcore::transaction::Action;
use util::{Address, H256, U256, Uint};
/// Either a hash or a number.
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "ipc", binary)]
pub enum HashOrNumber {
/// Block hash variant.
Hash(H256),
/// Block number variant.
Number(u64),
}
impl From<H256> for HashOrNumber {
fn from(hash: H256) -> Self {
HashOrNumber::Hash(hash)
}
}
impl From<u64> for HashOrNumber {
fn from(num: u64) -> Self {
HashOrNumber::Number(num)
}
}
/// A request for block headers.
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "ipc", binary)]
pub struct Headers {
/// Starting block number or hash.
pub start: HashOrNumber,
/// The maximum amount of headers which can be returned.
pub max: usize,
/// The amount of headers to skip between each response entry.
pub skip: u64,
/// Whether the headers should proceed in falling number from the initial block.
pub reverse: bool,
}
/// A request for specific block bodies.
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "ipc", binary)]
pub struct Bodies {
/// Hashes which bodies are being requested for.
pub block_hashes: Vec<H256>
}
/// A request for transaction receipts.
///
/// This request is answered with a list of transaction receipts for each block
/// requested.
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "ipc", binary)]
pub struct Receipts {
/// Block hashes to return receipts for.
pub block_hashes: Vec<H256>,
}
/// A request for a state proof
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "ipc", binary)]
pub struct StateProof {
/// Block hash to query state from.
pub block: H256,
/// Key of the state trie -- corresponds to account hash.
pub key1: H256,
/// Key in that account's storage trie; if empty, then the account RLP should be
/// returned.
pub key2: Option<H256>,
/// if greater than zero, trie nodes beyond this level may be omitted.
pub from_level: u32, // could even safely be u8; trie w/ 32-byte key can be at most 64-levels deep.
}
/// A request for state proofs.
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "ipc", binary)]
pub struct StateProofs {
/// All the proof requests.
pub requests: Vec<StateProof>,
}
/// A request for contract code.
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "ipc", binary)]
pub struct ContractCode {
/// Block hash
pub block_hash: H256,
/// Account key (== sha3(address))
pub account_key: H256,
}
/// A request for contract code.
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "ipc", binary)]
pub struct ContractCodes {
/// Block hash and account key (== sha3(address)) pairs to fetch code for.
pub code_requests: Vec<ContractCode>,
}
/// A request for a header proof from the Canonical Hash Trie.
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "ipc", binary)]
pub struct HeaderProof {
/// Number of the CHT.
pub cht_number: u64,
/// Block number requested. May not be 0: genesis isn't included in any CHT.
pub block_number: u64,
/// If greater than zero, trie nodes beyond this level may be omitted.
pub from_level: u32,
}
/// A request for header proofs from the CHT.
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "ipc", binary)]
pub struct HeaderProofs {
/// All the proof requests.
pub requests: Vec<HeaderProof>,
}
/// A request for proof of (simulated) transaction execution.
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "ipc", binary)]
pub struct TransactionProof {
/// Block hash to request for.
pub at: H256,
/// Address to treat as the caller.
pub from: Address,
/// Action to take: either a call or a create.
pub action: Action,
/// Amount of gas to request proof-of-execution for.
pub gas: U256,
/// Price for each gas.
pub gas_price: U256,
/// Value to simulate sending.
pub value: U256,
/// Transaction data.
pub data: Vec<u8>,
}
/// Kinds of requests.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "ipc", binary)]
pub enum Kind {
/// Requesting headers.
Headers,
/// Requesting block bodies.
Bodies,
/// Requesting transaction receipts.
Receipts,
/// Requesting proofs of state trie nodes.
StateProofs,
/// Requesting contract code by hash.
Codes,
/// Requesting header proofs (from the CHT).
HeaderProofs,
/// Requesting proof of transaction execution.
TransactionProof,
}
/// Encompasses all possible types of requests in a single structure.
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "ipc", binary)]
pub enum Request {
/// Requesting headers.
Headers(Headers),
/// Requesting block bodies.
Bodies(Bodies),
/// Requesting transaction receipts.
Receipts(Receipts),
/// Requesting state proofs.
StateProofs(StateProofs),
/// Requesting contract codes.
Codes(ContractCodes),
/// Requesting header proofs.
HeaderProofs(HeaderProofs),
/// Requesting proof of transaction execution.
TransactionProof(TransactionProof),
}
impl Request {
/// Get the kind of request this is.
pub fn kind(&self) -> Kind {
match *self {
Request::Headers(_) => Kind::Headers,
Request::Bodies(_) => Kind::Bodies,
Request::Receipts(_) => Kind::Receipts,
Request::StateProofs(_) => Kind::StateProofs,
Request::Codes(_) => Kind::Codes,
Request::HeaderProofs(_) => Kind::HeaderProofs,
Request::TransactionProof(_) => Kind::TransactionProof,
}
}
/// Get the amount of requests being made.
/// In the case of `TransactionProof`, this is the amount of gas being requested.
pub fn amount(&self) -> usize {
match *self {
Request::Headers(ref req) => req.max,
Request::Bodies(ref req) => req.block_hashes.len(),
Request::Receipts(ref req) => req.block_hashes.len(),
Request::StateProofs(ref req) => req.requests.len(),
Request::Codes(ref req) => req.code_requests.len(),
Request::HeaderProofs(ref req) => req.requests.len(),
Request::TransactionProof(ref req) => match req.gas > usize::max_value().into() {
true => usize::max_value(),
false => req.gas.low_u64() as usize,
}
}
}
}

View File

@ -14,4 +14,4 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
pub mod les_request; pub mod request;

View File

@ -0,0 +1,190 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Request chain builder utility.
//! Push requests with `push`. Back-references and data required to verify responses must be
//! supplied as well.
use std::collections::HashMap;
use request::{
IncompleteRequest, CompleteRequest, Request,
OutputKind, Output, NoSuchOutput, Response, ResponseError,
};
/// Build chained requests. Push them onto the series with `push`,
/// and produce a `Requests` object with `build`. Outputs are checked for consistency.
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct RequestBuilder {
output_kinds: HashMap<(usize, usize), OutputKind>,
requests: Vec<Request>,
}
impl RequestBuilder {
/// Attempt to push a request onto the request chain. Fails if the request
/// references a non-existent output of a prior request.
pub fn push(&mut self, request: Request) -> Result<(), NoSuchOutput> {
request.check_outputs(|req, idx, kind| {
match self.output_kinds.get(&(req, idx)) {
Some(k) if k == &kind => Ok(()),
_ => Err(NoSuchOutput),
}
})?;
let req_idx = self.requests.len();
request.note_outputs(|idx, kind| { self.output_kinds.insert((req_idx, idx), kind); });
self.requests.push(request);
Ok(())
}
/// Get a reference to the output kinds map.
pub fn output_kinds(&self) -> &HashMap<(usize, usize), OutputKind> {
&self.output_kinds
}
/// Convert this into a "requests" object.
pub fn build(self) -> Requests {
Requests {
outputs: HashMap::new(),
requests: self.requests,
answered: 0,
}
}
}
/// Requests pending responses.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Requests {
outputs: HashMap<(usize, usize), Output>,
requests: Vec<Request>,
answered: usize,
}
impl Requests {
/// For each request, produce responses for each.
/// The responses vector produced goes up to the point where the responder
/// first returns `None`, an invalid response, or until all requests have been responded to.
pub fn respond_to_all<F>(mut self, responder: F) -> Vec<Response>
where F: Fn(CompleteRequest) -> Option<Response>
{
let mut responses = Vec::new();
while let Some(response) = self.next_complete().and_then(&responder) {
match self.supply_response(&response) {
Ok(()) => responses.push(response),
Err(e) => {
debug!(target: "pip", "produced bad response to request: {:?}", e);
return responses;
}
}
}
responses
}
/// Get access to the underlying slice of requests.
// TODO: unimplemented -> Vec<Request>, // do we _have to_ allocate?
pub fn requests(&self) -> &[Request] { &self.requests }
/// Get the number of answered requests.
pub fn num_answered(&self) -> usize { self.answered }
/// Get the next request as a filled request. Returns `None` when all requests answered.
pub fn next_complete(&self) -> Option<CompleteRequest> {
if self.answered == self.requests.len() {
None
} else {
Some(self.requests[self.answered].clone()
.complete()
.expect("All outputs checked as invariant of `Requests` object; qed"))
}
}
/// Supply a response for the next request.
/// Fails on: wrong request kind, all requests answered already.
pub fn supply_response(&mut self, response: &Response) -> Result<(), ResponseError> {
let idx = self.answered;
// check validity.
if idx == self.requests.len() { return Err(ResponseError::Unexpected) }
if self.requests[idx].kind() != response.kind() { return Err(ResponseError::WrongKind) }
let outputs = &mut self.outputs;
response.fill_outputs(|out_idx, output| {
// we don't need to check output kinds here because all back-references
// are validated in the builder.
// TODO: optimization for only storing outputs we "care about"?
outputs.insert((idx, out_idx), output);
});
self.answered += 1;
// fill as much of the next request as we can.
if let Some(ref mut req) = self.requests.get_mut(self.answered) {
req.fill(|req_idx, out_idx| outputs.get(&(req_idx, out_idx)).cloned().ok_or(NoSuchOutput))
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use request::*;
use super::RequestBuilder;
use util::H256;
#[test]
fn all_scalar() {
let mut builder = RequestBuilder::default();
builder.push(Request::HeaderProof(IncompleteHeaderProofRequest {
num: 100.into(),
})).unwrap();
builder.push(Request::Receipts(IncompleteReceiptsRequest {
hash: H256::default().into(),
})).unwrap();
}
#[test]
#[should_panic]
fn missing_backref() {
let mut builder = RequestBuilder::default();
builder.push(Request::HeaderProof(IncompleteHeaderProofRequest {
num: Field::BackReference(100, 3),
})).unwrap();
}
#[test]
#[should_panic]
fn wrong_kind() {
let mut builder = RequestBuilder::default();
assert!(builder.push(Request::HeaderProof(IncompleteHeaderProofRequest {
num: 100.into(),
})).is_ok());
builder.push(Request::HeaderProof(IncompleteHeaderProofRequest {
num: Field::BackReference(0, 0),
})).unwrap();
}
#[test]
fn good_backreference() {
let mut builder = RequestBuilder::default();
builder.push(Request::HeaderProof(IncompleteHeaderProofRequest {
num: 100.into(), // header proof puts hash at output 0.
})).unwrap();
builder.push(Request::Receipts(IncompleteReceiptsRequest {
hash: Field::BackReference(0, 0),
})).unwrap();
}
}

File diff suppressed because it is too large Load Diff

View File

@ -1607,23 +1607,14 @@ impl MayPanic for Client {
} }
impl ::client::ProvingBlockChainClient for Client { impl ::client::ProvingBlockChainClient for Client {
fn prove_storage(&self, key1: H256, key2: H256, from_level: u32, id: BlockId) -> Vec<Bytes> { fn prove_storage(&self, key1: H256, key2: H256, id: BlockId) -> Option<(Vec<Bytes>, H256)> {
self.state_at(id) self.state_at(id)
.and_then(move |state| state.prove_storage(key1, key2, from_level).ok()) .and_then(move |state| state.prove_storage(key1, key2).ok())
.unwrap_or_else(Vec::new)
} }
fn prove_account(&self, key1: H256, from_level: u32, id: BlockId) -> Vec<Bytes> { fn prove_account(&self, key1: H256, id: BlockId) -> Option<(Vec<Bytes>, ::types::basic_account::BasicAccount)> {
self.state_at(id) self.state_at(id)
.and_then(move |state| state.prove_account(key1, from_level).ok()) .and_then(move |state| state.prove_account(key1).ok())
.unwrap_or_else(Vec::new)
}
fn code_by_hash(&self, account_key: H256, id: BlockId) -> Bytes {
self.state_at(id)
.and_then(move |state| state.code_by_address_hash(account_key).ok())
.and_then(|x| x)
.unwrap_or_else(Vec::new)
} }
fn prove_transaction(&self, transaction: SignedTransaction, id: BlockId) -> Option<Vec<DBValue>> { fn prove_transaction(&self, transaction: SignedTransaction, id: BlockId) -> Option<Vec<DBValue>> {
@ -1643,7 +1634,6 @@ impl ::client::ProvingBlockChainClient for Client {
_ => return Some(state.drop().1.extract_proof()), _ => return Some(state.drop().1.extract_proof()),
} }
} }
} }
impl Drop for Client { impl Drop for Client {

View File

@ -38,6 +38,7 @@ use error::{ImportResult, Error as EthcoreError};
use evm::{Factory as EvmFactory, VMType, Schedule}; use evm::{Factory as EvmFactory, VMType, Schedule};
use miner::{Miner, MinerService, TransactionImportResult}; use miner::{Miner, MinerService, TransactionImportResult};
use spec::Spec; use spec::Spec;
use types::basic_account::BasicAccount;
use types::mode::Mode; use types::mode::Mode;
use types::pruning_info::PruningInfo; use types::pruning_info::PruningInfo;
@ -758,16 +759,12 @@ impl BlockChainClient for TestBlockChainClient {
} }
impl ProvingBlockChainClient for TestBlockChainClient { impl ProvingBlockChainClient for TestBlockChainClient {
fn prove_storage(&self, _: H256, _: H256, _: u32, _: BlockId) -> Vec<Bytes> { fn prove_storage(&self, _: H256, _: H256, _: BlockId) -> Option<(Vec<Bytes>, H256)> {
Vec::new() None
} }
fn prove_account(&self, _: H256, _: u32, _: BlockId) -> Vec<Bytes> { fn prove_account(&self, _: H256, _: BlockId) -> Option<(Vec<Bytes>, BasicAccount)> {
Vec::new() None
}
fn code_by_hash(&self, _: H256, _: BlockId) -> Bytes {
Vec::new()
} }
fn prove_transaction(&self, _: SignedTransaction, _: BlockId) -> Option<Vec<DBValue>> { fn prove_transaction(&self, _: SignedTransaction, _: BlockId) -> Option<Vec<DBValue>> {

View File

@ -34,6 +34,7 @@ use env_info::LastHashes;
use block_import_error::BlockImportError; use block_import_error::BlockImportError;
use ipc::IpcConfig; use ipc::IpcConfig;
use types::ids::*; use types::ids::*;
use types::basic_account::BasicAccount;
use types::trace_filter::Filter as TraceFilter; use types::trace_filter::Filter as TraceFilter;
use types::call_analytics::CallAnalytics; use types::call_analytics::CallAnalytics;
use types::blockchain_info::BlockChainInfo; use types::blockchain_info::BlockChainInfo;
@ -315,19 +316,12 @@ pub trait ProvingBlockChainClient: BlockChainClient {
/// ///
/// Both provided keys assume a secure trie. /// Both provided keys assume a secure trie.
/// Returns a vector of raw trie nodes (in order from the root) proving the storage query. /// Returns a vector of raw trie nodes (in order from the root) proving the storage query.
/// Nodes after `from_level` may be omitted. fn prove_storage(&self, key1: H256, key2: H256, id: BlockId) -> Option<(Vec<Bytes>, H256)>;
/// An empty vector indicates unservable query.
fn prove_storage(&self, key1: H256, key2: H256, from_level: u32, id: BlockId) -> Vec<Bytes>;
/// Prove account existence at a specific block id. /// Prove account existence at a specific block id.
/// The key is the keccak hash of the account's address. /// The key is the keccak hash of the account's address.
/// Returns a vector of raw trie nodes (in order from the root) proving the query. /// Returns a vector of raw trie nodes (in order from the root) proving the query.
/// Nodes after `from_level` may be omitted. fn prove_account(&self, key1: H256, id: BlockId) -> Option<(Vec<Bytes>, BasicAccount)>;
/// An empty vector indicates unservable query.
fn prove_account(&self, key1: H256, from_level: u32, id: BlockId) -> Vec<Bytes>;
/// Get code by address hash.
fn code_by_hash(&self, account_key: H256, id: BlockId) -> Bytes;
/// Prove execution of a transaction at the given block. /// Prove execution of a transaction at the given block.
fn prove_transaction(&self, transaction: SignedTransaction, id: BlockId) -> Option<Vec<DBValue>>; fn prove_transaction(&self, transaction: SignedTransaction, id: BlockId) -> Option<Vec<DBValue>>;

View File

@ -438,18 +438,19 @@ impl Account {
/// trie. /// trie.
/// `storage_key` is the hash of the desired storage key, meaning /// `storage_key` is the hash of the desired storage key, meaning
/// this will only work correctly under a secure trie. /// this will only work correctly under a secure trie.
/// Returns a merkle proof of the storage trie node with all nodes before `from_level` pub fn prove_storage(&self, db: &HashDB, storage_key: H256) -> Result<(Vec<Bytes>, H256), Box<TrieError>> {
/// omitted.
pub fn prove_storage(&self, db: &HashDB, storage_key: H256, from_level: u32) -> Result<Vec<Bytes>, Box<TrieError>> {
use util::trie::{Trie, TrieDB}; use util::trie::{Trie, TrieDB};
use util::trie::recorder::Recorder; use util::trie::recorder::Recorder;
let mut recorder = Recorder::with_depth(from_level); let mut recorder = Recorder::new();
let trie = TrieDB::new(db, &self.storage_root)?; let trie = TrieDB::new(db, &self.storage_root)?;
let _ = trie.get_with(&storage_key, &mut recorder)?; let item: U256 = {
let query = (&mut recorder, ::rlp::decode);
trie.get_with(&storage_key, query)?.unwrap_or_else(U256::zero)
};
Ok(recorder.drain().into_iter().map(|r| r.data).collect()) Ok((recorder.drain().into_iter().map(|r| r.data).collect(), item.into()))
} }
} }

View File

@ -31,6 +31,7 @@ use factory::Factories;
use trace::FlatTrace; use trace::FlatTrace;
use pod_account::*; use pod_account::*;
use pod_state::{self, PodState}; use pod_state::{self, PodState};
use types::basic_account::BasicAccount;
use types::executed::{Executed, ExecutionError}; use types::executed::{Executed, ExecutionError};
use types::state_diff::StateDiff; use types::state_diff::StateDiff;
use transaction::SignedTransaction; use transaction::SignedTransaction;
@ -857,47 +858,43 @@ impl<B: Backend> State<B> {
// State proof implementations; useful for light client protocols. // State proof implementations; useful for light client protocols.
impl<B: Backend> State<B> { impl<B: Backend> State<B> {
/// Prove an account's existence or nonexistence in the state trie. /// Prove an account's existence or nonexistence in the state trie.
/// Returns a merkle proof of the account's trie node with all nodes before `from_level` /// Returns a merkle proof of the account's trie node omitted or an encountered trie error.
/// omitted or an encountered trie error. /// If the account doesn't exist in the trie, prove that and return defaults.
/// Requires a secure trie to be used for accurate results. /// Requires a secure trie to be used for accurate results.
/// `account_key` == sha3(address) /// `account_key` == sha3(address)
pub fn prove_account(&self, account_key: H256, from_level: u32) -> trie::Result<Vec<Bytes>> { pub fn prove_account(&self, account_key: H256) -> trie::Result<(Vec<Bytes>, BasicAccount)> {
let mut recorder = Recorder::with_depth(from_level); let mut recorder = Recorder::new();
let trie = TrieDB::new(self.db.as_hashdb(), &self.root)?; let trie = TrieDB::new(self.db.as_hashdb(), &self.root)?;
trie.get_with(&account_key, &mut recorder)?; let maybe_account: Option<BasicAccount> = {
let query = (&mut recorder, ::rlp::decode);
trie.get_with(&account_key, query)?
};
let account = maybe_account.unwrap_or_else(|| BasicAccount {
balance: 0.into(),
nonce: self.account_start_nonce,
code_hash: SHA3_EMPTY,
storage_root: ::util::sha3::SHA3_NULL_RLP,
});
Ok(recorder.drain().into_iter().map(|r| r.data).collect()) Ok((recorder.drain().into_iter().map(|r| r.data).collect(), account))
} }
/// Prove an account's storage key's existence or nonexistence in the state. /// Prove an account's storage key's existence or nonexistence in the state.
/// Returns a merkle proof of the account's storage trie with all nodes before /// Returns a merkle proof of the account's storage trie.
/// `from_level` omitted. Requires a secure trie to be used for correctness. /// Requires a secure trie to be used for correctness.
/// `account_key` == sha3(address) /// `account_key` == sha3(address)
/// `storage_key` == sha3(key) /// `storage_key` == sha3(key)
pub fn prove_storage(&self, account_key: H256, storage_key: H256, from_level: u32) -> trie::Result<Vec<Bytes>> { pub fn prove_storage(&self, account_key: H256, storage_key: H256) -> trie::Result<(Vec<Bytes>, H256)> {
// TODO: probably could look into cache somehow but it's keyed by // TODO: probably could look into cache somehow but it's keyed by
// address, not sha3(address). // address, not sha3(address).
let trie = TrieDB::new(self.db.as_hashdb(), &self.root)?; let trie = TrieDB::new(self.db.as_hashdb(), &self.root)?;
let acc = match trie.get_with(&account_key, Account::from_rlp)? { let acc = match trie.get_with(&account_key, Account::from_rlp)? {
Some(acc) => acc, Some(acc) => acc,
None => return Ok(Vec::new()), None => return Ok((Vec::new(), H256::new())),
}; };
let account_db = self.factories.accountdb.readonly(self.db.as_hashdb(), account_key); let account_db = self.factories.accountdb.readonly(self.db.as_hashdb(), account_key);
acc.prove_storage(account_db.as_hashdb(), storage_key, from_level) acc.prove_storage(account_db.as_hashdb(), storage_key)
}
/// Get code by address hash.
/// Only works when backed by a secure trie.
pub fn code_by_address_hash(&self, account_key: H256) -> trie::Result<Option<Bytes>> {
let trie = TrieDB::new(self.db.as_hashdb(), &self.root)?;
let mut acc = match trie.get_with(&account_key, Account::from_rlp)? {
Some(acc) => acc,
None => return Ok(None),
};
let account_db = self.factories.accountdb.readonly(self.db.as_hashdb(), account_key);
Ok(acc.cache_code(account_db.as_hashdb()).map(|c| (&*c).clone()))
} }
} }

View File

@ -268,7 +268,10 @@ impl LightDispatcher {
})); }));
match nonce_future { match nonce_future {
Some(x) => x.map(|acc| acc.nonce).map_err(|_| errors::no_light_peers()).boxed(), Some(x) =>
x.map(|acc| acc.map_or_else(Default::default, |acc| acc.nonce))
.map_err(|_| errors::no_light_peers())
.boxed(),
None => future::err(errors::network_disabled()).boxed() None => future::err(errors::network_disabled()).boxed()
} }
} }

View File

@ -105,15 +105,22 @@ impl EthClient {
match cht_root { match cht_root {
None => return future::ok(None).boxed(), None => return future::ok(None).boxed(),
Some(root) => { Some(root) => {
let req = request::HeaderByNumber::new(n, root) let req = request::HeaderProof::new(n, root)
.expect("only fails for 0; client always stores genesis; client already queried; qed"); .expect("only fails for 0; client always stores genesis; client already queried; qed");
self.sync.with_context(|ctx| let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone());
self.on_demand.header_by_number(ctx, req) self.sync.with_context(|ctx| {
.map(Some) let fut = self.on_demand.hash_by_number(ctx, req)
.map_err(err_premature_cancel) .map(request::HeaderByHash)
.boxed() .map_err(err_premature_cancel);
)
fut.and_then(move |req| {
match sync.with_context(|ctx| on_demand.header_by_hash(ctx, req)) {
Some(fut) => fut.map_err(err_premature_cancel).boxed(),
None => future::err(errors::network_disabled()).boxed(),
}
}).map(Some).boxed()
})
} }
} }
} }
@ -149,7 +156,7 @@ impl EthClient {
sync.with_context(|ctx| on_demand.account(ctx, request::Account { sync.with_context(|ctx| on_demand.account(ctx, request::Account {
header: header, header: header,
address: address, address: address,
}).map(Some)) }))
.map(|x| x.map_err(err_premature_cancel).boxed()) .map(|x| x.map_err(err_premature_cancel).boxed())
.unwrap_or_else(|| future::err(errors::network_disabled()).boxed()) .unwrap_or_else(|| future::err(errors::network_disabled()).boxed())
}).boxed() }).boxed()

View File

@ -16,7 +16,7 @@
//! Light client synchronization. //! Light client synchronization.
//! //!
//! This will synchronize the header chain using LES messages. //! This will synchronize the header chain using PIP messages.
//! Dataflow is largely one-directional as headers are pushed into //! Dataflow is largely one-directional as headers are pushed into
//! the light client queue for import. Where possible, they are batched //! the light client queue for import. Where possible, they are batched
//! in groups. //! in groups.
@ -36,14 +36,15 @@ use std::collections::HashMap;
use std::mem; use std::mem;
use std::sync::Arc; use std::sync::Arc;
use ethcore::encoded;
use light::client::{AsLightClient, LightChainClient}; use light::client::{AsLightClient, LightChainClient};
use light::net::{ use light::net::{
Announcement, Handler, BasicContext, EventContext, Announcement, Handler, BasicContext, EventContext,
Capabilities, ReqId, Status, Capabilities, ReqId, Status, Error as NetError,
}; };
use light::request; use light::request::{self, CompleteHeadersRequest as HeadersRequest};
use network::PeerId; use network::PeerId;
use util::{Bytes, U256, H256, Mutex, RwLock}; use util::{U256, H256, Mutex, RwLock};
use rand::{Rng, OsRng}; use rand::{Rng, OsRng};
use self::sync_round::{AbortReason, SyncRound, ResponseContext}; use self::sync_round::{AbortReason, SyncRound, ResponseContext};
@ -91,7 +92,7 @@ impl Peer {
#[derive(Debug)] #[derive(Debug)]
enum AncestorSearch { enum AncestorSearch {
Queued(u64), // queued to search for blocks starting from here. Queued(u64), // queued to search for blocks starting from here.
Awaiting(ReqId, u64, request::Headers), // awaiting response for this request. Awaiting(ReqId, u64, HeadersRequest), // awaiting response for this request.
Prehistoric, // prehistoric block found. TODO: start to roll back CHTs. Prehistoric, // prehistoric block found. TODO: start to roll back CHTs.
FoundCommon(u64, H256), // common block found. FoundCommon(u64, H256), // common block found.
Genesis, // common ancestor is the genesis. Genesis, // common ancestor is the genesis.
@ -113,7 +114,7 @@ impl AncestorSearch {
match self { match self {
AncestorSearch::Awaiting(id, start, req) => { AncestorSearch::Awaiting(id, start, req) => {
if &id == ctx.req_id() { if &id == ctx.req_id() {
match response::decode_and_verify(ctx.data(), &req) { match response::verify(ctx.data(), &req) {
Ok(headers) => { Ok(headers) => {
for header in &headers { for header in &headers {
if client.is_known(&header.hash()) { if client.is_known(&header.hash()) {
@ -150,17 +151,17 @@ impl AncestorSearch {
} }
fn dispatch_request<F>(self, mut dispatcher: F) -> AncestorSearch fn dispatch_request<F>(self, mut dispatcher: F) -> AncestorSearch
where F: FnMut(request::Headers) -> Option<ReqId> where F: FnMut(HeadersRequest) -> Option<ReqId>
{ {
const BATCH_SIZE: usize = 64; const BATCH_SIZE: u64 = 64;
match self { match self {
AncestorSearch::Queued(start) => { AncestorSearch::Queued(start) => {
let batch_size = ::std::cmp::min(start as usize, BATCH_SIZE); let batch_size = ::std::cmp::min(start, BATCH_SIZE);
trace!(target: "sync", "Requesting {} reverse headers from {} to find common ancestor", trace!(target: "sync", "Requesting {} reverse headers from {} to find common ancestor",
batch_size, start); batch_size, start);
let req = request::Headers { let req = HeadersRequest {
start: start.into(), start: start.into(),
max: batch_size, max: batch_size,
skip: 0, skip: 0,
@ -193,13 +194,13 @@ struct ResponseCtx<'a> {
peer: PeerId, peer: PeerId,
req_id: ReqId, req_id: ReqId,
ctx: &'a BasicContext, ctx: &'a BasicContext,
data: &'a [Bytes], data: &'a [encoded::Header],
} }
impl<'a> ResponseContext for ResponseCtx<'a> { impl<'a> ResponseContext for ResponseCtx<'a> {
fn responder(&self) -> PeerId { self.peer } fn responder(&self) -> PeerId { self.peer }
fn req_id(&self) -> &ReqId { &self.req_id } fn req_id(&self) -> &ReqId { &self.req_id }
fn data(&self) -> &[Bytes] { self.data } fn data(&self) -> &[encoded::Header] { self.data }
fn punish_responder(&self) { self.ctx.disable_peer(self.peer) } fn punish_responder(&self) { self.ctx.disable_peer(self.peer) }
} }
@ -313,11 +314,22 @@ impl<L: AsLightClient + Send + Sync> Handler for LightSync<L> {
self.maintain_sync(ctx.as_basic()); self.maintain_sync(ctx.as_basic());
} }
fn on_block_headers(&self, ctx: &EventContext, req_id: ReqId, headers: &[Bytes]) { fn on_responses(&self, ctx: &EventContext, req_id: ReqId, responses: &[request::Response]) {
if !self.peers.read().contains_key(&ctx.peer()) { let peer = ctx.peer();
if !self.peers.read().contains_key(&peer) {
return return
} }
let headers = match responses.get(0) {
Some(&request::Response::Headers(ref response)) => &response.headers[..],
Some(_) => {
trace!("Disabling peer {} for wrong response type.", peer);
ctx.disable_peer(peer);
&[]
}
None => &[],
};
{ {
let mut state = self.state.lock(); let mut state = self.state.lock();
@ -465,18 +477,27 @@ impl<L: AsLightClient> LightSync<L> {
// naive request dispatcher: just give to any peer which says it will // naive request dispatcher: just give to any peer which says it will
// give us responses. // give us responses.
let dispatcher = move |req: request::Headers| { let dispatcher = move |req: HeadersRequest| {
rng.shuffle(&mut peer_ids); rng.shuffle(&mut peer_ids);
let request = {
let mut builder = request::RequestBuilder::default();
builder.push(request::Request::Headers(request::IncompleteHeadersRequest {
start: req.start.into(),
skip: req.skip,
max: req.max,
reverse: req.reverse,
})).expect("request provided fully complete with no unresolved back-references; qed");
builder.build()
};
for peer in &peer_ids { for peer in &peer_ids {
if ctx.max_requests(*peer, request::Kind::Headers) >= req.max { match ctx.request_from(*peer, request.clone()) {
match ctx.request_from(*peer, request::Request::Headers(req.clone())) { Ok(id) => {
Ok(id) => { return Some(id)
return Some(id)
}
Err(e) =>
trace!(target: "sync", "Error requesting headers from viable peer: {}", e),
} }
Err(NetError::NoCredits) => {}
Err(e) =>
trace!(target: "sync", "Error requesting headers from viable peer: {}", e),
} }
} }

View File

@ -18,10 +18,11 @@
use std::fmt; use std::fmt;
use ethcore::encoded;
use ethcore::header::Header; use ethcore::header::Header;
use light::request::{HashOrNumber, Headers as HeadersRequest}; use light::request::{HashOrNumber, CompleteHeadersRequest as HeadersRequest};
use rlp::{DecoderError, UntrustedRlp}; use rlp::DecoderError;
use util::{Bytes, H256}; use util::H256;
/// Errors found when decoding headers and verifying with basic constraints. /// Errors found when decoding headers and verifying with basic constraints.
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
@ -71,13 +72,13 @@ pub trait Constraint {
fn verify(&self, headers: &[Header], reverse: bool) -> Result<(), Self::Error>; fn verify(&self, headers: &[Header], reverse: bool) -> Result<(), Self::Error>;
} }
/// Decode a response and do basic verification against a request. /// Do basic verification of provided headers against a request.
pub fn decode_and_verify(headers: &[Bytes], request: &HeadersRequest) -> Result<Vec<Header>, BasicError> { pub fn verify(headers: &[encoded::Header], request: &HeadersRequest) -> Result<Vec<Header>, BasicError> {
let headers: Vec<_> = try!(headers.iter().map(|x| UntrustedRlp::new(&x).as_val()).collect()); let headers: Vec<_> = headers.iter().map(|h| h.decode()).collect();
let reverse = request.reverse; let reverse = request.reverse;
try!(Max(request.max).verify(&headers, reverse)); try!(Max(request.max as usize).verify(&headers, reverse));
match request.start { match request.start {
HashOrNumber::Number(ref num) => try!(StartsAtNumber(*num).verify(&headers, reverse)), HashOrNumber::Number(ref num) => try!(StartsAtNumber(*num).verify(&headers, reverse)),
HashOrNumber::Hash(ref hash) => try!(StartsAtHash(*hash).verify(&headers, reverse)), HashOrNumber::Hash(ref hash) => try!(StartsAtHash(*hash).verify(&headers, reverse)),
@ -150,8 +151,9 @@ impl Constraint for Max {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use ethcore::encoded;
use ethcore::header::Header; use ethcore::header::Header;
use light::request::Headers as HeadersRequest; use light::request::CompleteHeadersRequest as HeadersRequest;
use super::*; use super::*;
@ -175,10 +177,10 @@ mod tests {
parent_hash = Some(header.hash()); parent_hash = Some(header.hash());
::rlp::encode(&header).to_vec() encoded::Header::new(::rlp::encode(&header).to_vec())
}).collect(); }).collect();
assert!(decode_and_verify(&headers, &request).is_ok()); assert!(verify(&headers, &request).is_ok());
} }
#[test] #[test]
@ -201,10 +203,10 @@ mod tests {
parent_hash = Some(header.hash()); parent_hash = Some(header.hash());
::rlp::encode(&header).to_vec() encoded::Header::new(::rlp::encode(&header).to_vec())
}).collect(); }).collect();
assert!(decode_and_verify(&headers, &request).is_ok()); assert!(verify(&headers, &request).is_ok());
} }
#[test] #[test]
@ -227,10 +229,10 @@ mod tests {
parent_hash = Some(header.hash()); parent_hash = Some(header.hash());
::rlp::encode(&header).to_vec() encoded::Header::new(::rlp::encode(&header).to_vec())
}).collect(); }).collect();
assert_eq!(decode_and_verify(&headers, &request), Err(BasicError::TooManyHeaders(20, 25))); assert_eq!(verify(&headers, &request), Err(BasicError::TooManyHeaders(20, 25)));
} }
#[test] #[test]
@ -246,9 +248,9 @@ mod tests {
let mut header = Header::default(); let mut header = Header::default();
header.set_number(x); header.set_number(x);
::rlp::encode(&header).to_vec() encoded::Header::new(::rlp::encode(&header).to_vec())
}).collect(); }).collect();
assert_eq!(decode_and_verify(&headers, &request), Err(BasicError::WrongSkip(5, Some(2)))); assert_eq!(verify(&headers, &request), Err(BasicError::WrongSkip(5, Some(2))));
} }
} }

View File

@ -20,13 +20,14 @@ use std::cmp::Ordering;
use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque}; use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
use std::fmt; use std::fmt;
use ethcore::encoded;
use ethcore::header::Header; use ethcore::header::Header;
use light::net::ReqId; use light::net::ReqId;
use light::request::Headers as HeadersRequest; use light::request::CompleteHeadersRequest as HeadersRequest;
use network::PeerId; use network::PeerId;
use util::{Bytes, H256}; use util::H256;
use super::response; use super::response;
@ -40,7 +41,7 @@ pub trait ResponseContext {
/// Get the request ID this response corresponds to. /// Get the request ID this response corresponds to.
fn req_id(&self) -> &ReqId; fn req_id(&self) -> &ReqId;
/// Get the (unverified) response data. /// Get the (unverified) response data.
fn data(&self) -> &[Bytes]; fn data(&self) -> &[encoded::Header];
/// Punish the responder. /// Punish the responder.
fn punish_responder(&self); fn punish_responder(&self);
} }
@ -114,7 +115,7 @@ impl Fetcher {
let needed_headers = HeadersRequest { let needed_headers = HeadersRequest {
start: high_rung.parent_hash().clone().into(), start: high_rung.parent_hash().clone().into(),
max: diff as usize - 1, max: diff - 1,
skip: 0, skip: 0,
reverse: true, reverse: true,
}; };
@ -190,7 +191,7 @@ impl Fetcher {
return SyncRound::Fetch(self); return SyncRound::Fetch(self);
} }
match response::decode_and_verify(headers, &request.headers_request) { match response::verify(headers, &request.headers_request) {
Err(e) => { Err(e) => {
trace!(target: "sync", "Punishing peer {} for invalid response ({})", ctx.responder(), e); trace!(target: "sync", "Punishing peer {} for invalid response ({})", ctx.responder(), e);
ctx.punish_responder(); ctx.punish_responder();
@ -286,21 +287,21 @@ impl Fetcher {
} }
// Compute scaffold parameters from non-zero distance between start and target block: (skip, pivots). // Compute scaffold parameters from non-zero distance between start and target block: (skip, pivots).
fn scaffold_params(diff: u64) -> (u64, usize) { fn scaffold_params(diff: u64) -> (u64, u64) {
// default parameters. // default parameters.
// amount of blocks between each scaffold pivot. // amount of blocks between each scaffold pivot.
const ROUND_SKIP: u64 = 255; const ROUND_SKIP: u64 = 255;
// amount of scaffold pivots: these are the Xs in "X___X___X" // amount of scaffold pivots: these are the Xs in "X___X___X"
const ROUND_PIVOTS: usize = 256; const ROUND_PIVOTS: u64 = 256;
let rem = diff % (ROUND_SKIP + 1); let rem = diff % (ROUND_SKIP + 1);
if diff <= ROUND_SKIP { if diff <= ROUND_SKIP {
// just request headers from the start to the target. // just request headers from the start to the target.
(0, rem as usize) (0, rem)
} else { } else {
// the number of pivots necessary to exactly hit or overshoot the target. // the number of pivots necessary to exactly hit or overshoot the target.
let pivots_to_target = (diff / (ROUND_SKIP + 1)) + if rem == 0 { 0 } else { 1 }; let pivots_to_target = (diff / (ROUND_SKIP + 1)) + if rem == 0 { 0 } else { 1 };
let num_pivots = ::std::cmp::min(pivots_to_target, ROUND_PIVOTS as u64) as usize; let num_pivots = ::std::cmp::min(pivots_to_target, ROUND_PIVOTS);
(ROUND_SKIP, num_pivots) (ROUND_SKIP, num_pivots)
} }
} }
@ -319,7 +320,7 @@ pub struct RoundStart {
contributors: HashSet<PeerId>, contributors: HashSet<PeerId>,
attempt: usize, attempt: usize,
skip: u64, skip: u64,
pivots: usize, pivots: u64,
} }
impl RoundStart { impl RoundStart {
@ -372,7 +373,7 @@ impl RoundStart {
} }
}; };
match response::decode_and_verify(ctx.data(), &req) { match response::verify(ctx.data(), &req) {
Ok(headers) => { Ok(headers) => {
if self.sparse_headers.len() == 0 if self.sparse_headers.len() == 0
&& headers.get(0).map_or(false, |x| x.parent_hash() != &self.start_block.1) { && headers.get(0).map_or(false, |x| x.parent_hash() != &self.start_block.1) {
@ -383,7 +384,7 @@ impl RoundStart {
self.contributors.insert(ctx.responder()); self.contributors.insert(ctx.responder());
self.sparse_headers.extend(headers); self.sparse_headers.extend(headers);
if self.sparse_headers.len() == self.pivots { if self.sparse_headers.len() as u64 == self.pivots {
return if self.skip == 0 { return if self.skip == 0 {
SyncRound::abort(AbortReason::TargetReached, self.sparse_headers.into()) SyncRound::abort(AbortReason::TargetReached, self.sparse_headers.into())
} else { } else {
@ -429,7 +430,7 @@ impl RoundStart {
let start = (self.start_block.0 + 1) let start = (self.start_block.0 + 1)
+ self.sparse_headers.len() as u64 * (self.skip + 1); + self.sparse_headers.len() as u64 * (self.skip + 1);
let max = self.pivots - self.sparse_headers.len(); let max = self.pivots - self.sparse_headers.len() as u64;
let headers_request = HeadersRequest { let headers_request = HeadersRequest {
start: start.into(), start: start.into(),

View File

@ -28,6 +28,7 @@ use io::IoChannel;
use light::client::Client as LightClient; use light::client::Client as LightClient;
use light::net::{LightProtocol, IoContext, Capabilities, Params as LightParams}; use light::net::{LightProtocol, IoContext, Capabilities, Params as LightParams};
use light::net::request_credits::FlowParams; use light::net::request_credits::FlowParams;
use light::provider::LightProvider;
use network::{NodeId, PeerId}; use network::{NodeId, PeerId};
use util::RwLock; use util::RwLock;
@ -71,7 +72,7 @@ enum PeerData {
} }
// test peer type. // test peer type.
// Either a full peer or a LES peer. // Either a full peer or a light peer.
pub struct Peer { pub struct Peer {
proto: LightProtocol, proto: LightProtocol,
queue: RwLock<VecDeque<TestPacket>>, queue: RwLock<VecDeque<TestPacket>>,
@ -115,7 +116,8 @@ impl Peer {
}, },
}; };
let mut proto = LightProtocol::new(chain.clone(), params); let provider = LightProvider::new(chain.clone(), Arc::new(RwLock::new(Default::default())));
let mut proto = LightProtocol::new(Arc::new(provider), params);
proto.add_handler(sync.clone()); proto.add_handler(sync.clone());
Peer { Peer {
proto: proto, proto: proto,