Merge pull request #5419 from paritytech/on-demand-priority

Improve on-demand dispatch and add support for batch requests
This commit is contained in:
Robert Habermeier 2017-05-17 12:28:27 +02:00 committed by GitHub
commit 5d973f8ef5
20 changed files with 1415 additions and 550 deletions

View File

@ -20,7 +20,7 @@ use network::{NetworkContext, PeerId, NodeId};
use super::{Announcement, LightProtocol, ReqId}; use super::{Announcement, LightProtocol, ReqId};
use super::error::Error; use super::error::Error;
use request::Requests; use request::NetworkRequests as Requests;
/// An I/O context which allows sending and receiving packets as well as /// An I/O context which allows sending and receiving packets as well as
/// disconnecting peers. This is used as a generalization of the portions /// disconnecting peers. This is used as a generalization of the portions

View File

@ -33,7 +33,7 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use provider::Provider; use provider::Provider;
use request::{Request, Requests, Response}; use request::{Request, NetworkRequests as Requests, Response};
use self::request_credits::{Credits, FlowParams}; use self::request_credits::{Credits, FlowParams};
use self::context::{Ctx, TickCtx}; use self::context::{Ctx, TickCtx};
@ -108,9 +108,14 @@ mod timeout {
} }
/// A request id. /// A request id.
#[cfg(not(test))]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Ord, PartialOrd, Hash)] #[derive(Debug, Clone, Copy, PartialEq, Eq, Ord, PartialOrd, Hash)]
pub struct ReqId(usize); pub struct ReqId(usize);
#[cfg(test)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Ord, PartialOrd, Hash)]
pub struct ReqId(pub usize);
impl fmt::Display for ReqId { impl fmt::Display for ReqId {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Request #{}", self.0) write!(f, "Request #{}", self.0)

View File

@ -25,7 +25,7 @@ use std::collections::{BTreeMap, HashMap};
use std::iter::FromIterator; use std::iter::FromIterator;
use request::Request; use request::Request;
use request::Requests; use request::NetworkRequests as Requests;
use net::{timeout, ReqId}; use net::{timeout, ReqId};
use util::U256; use util::U256;

View File

@ -39,14 +39,14 @@ use std::sync::Arc;
// helper for encoding a single request into a packet. // helper for encoding a single request into a packet.
// panics on bad backreference. // panics on bad backreference.
fn encode_single(request: Request) -> Requests { fn encode_single(request: Request) -> NetworkRequests {
let mut builder = RequestBuilder::default(); let mut builder = RequestBuilder::default();
builder.push(request).unwrap(); builder.push(request).unwrap();
builder.build() builder.build()
} }
// helper for making a packet out of `Requests`. // helper for making a packet out of `Requests`.
fn make_packet(req_id: usize, requests: &Requests) -> Vec<u8> { fn make_packet(req_id: usize, requests: &NetworkRequests) -> Vec<u8> {
let mut stream = RlpStream::new_list(2); let mut stream = RlpStream::new_list(2);
stream.append(&req_id).append_list(&requests.requests()); stream.append(&req_id).append_list(&requests.requests());
stream.out() stream.out()

View File

@ -18,20 +18,17 @@
//! The request service is implemented using Futures. Higher level request handlers //! The request service is implemented using Futures. Higher level request handlers
//! will take the raw data received here and extract meaningful results from it. //! will take the raw data received here and extract meaningful results from it.
// TODO [ToDr] Suppressing deprecation warnings. Rob will fix the API anyway.
#![allow(deprecated)]
use std::collections::HashMap; use std::collections::HashMap;
use std::marker::PhantomData;
use std::sync::Arc; use std::sync::Arc;
use ethcore::basic_account::BasicAccount; use ethcore::basic_account::BasicAccount;
use ethcore::encoded; use ethcore::encoded;
use ethcore::receipt::Receipt; use ethcore::receipt::Receipt;
use ethcore::state::ProvedExecution;
use ethcore::executed::{Executed, ExecutionError}; use ethcore::executed::{Executed, ExecutionError};
use futures::{Async, Poll, Future}; use futures::{future, Async, Poll, Future, BoxFuture};
use futures::sync::oneshot::{self, Sender, Receiver}; use futures::sync::oneshot::{self, Sender, Receiver, Canceled};
use network::PeerId; use network::PeerId;
use rlp::RlpStream; use rlp::RlpStream;
use util::{Bytes, RwLock, Mutex, U256, H256}; use util::{Bytes, RwLock, Mutex, U256, H256};
@ -39,10 +36,19 @@ use util::sha3::{SHA3_NULL_RLP, SHA3_EMPTY, SHA3_EMPTY_LIST_RLP};
use net::{self, Handler, Status, Capabilities, Announcement, EventContext, BasicContext, ReqId}; use net::{self, Handler, Status, Capabilities, Announcement, EventContext, BasicContext, ReqId};
use cache::Cache; use cache::Cache;
use request::{self as basic_request, Request as NetworkRequest, Response as NetworkResponse}; use request::{self as basic_request, Request as NetworkRequest};
use self::request::CheckedRequest;
pub use self::request::{Request, Response};
#[cfg(test)]
mod tests;
pub mod request; pub mod request;
/// The result of execution
pub type ExecutionResult = Result<Executed, ExecutionError>;
// relevant peer info. // relevant peer info.
struct Peer { struct Peer {
status: Status, status: Status,
@ -50,146 +56,154 @@ struct Peer {
} }
impl Peer { impl Peer {
// Whether a given peer can handle a specific request. // whether this peer can fulfill the
fn can_handle(&self, pending: &Pending) -> bool { fn can_fulfill(&self, c: &Capabilities) -> bool {
match *pending { let caps = &self.capabilities;
Pending::HeaderProof(ref req, _) =>
self.capabilities.serve_headers && self.status.head_num > req.num(),
Pending::HeaderByHash(_, _) => self.capabilities.serve_headers,
Pending::Block(ref req, _) =>
self.capabilities.serve_chain_since.as_ref().map_or(false, |x| *x <= req.header.number()),
Pending::BlockReceipts(ref req, _) =>
self.capabilities.serve_chain_since.as_ref().map_or(false, |x| *x <= req.0.number()),
Pending::Account(ref req, _) =>
self.capabilities.serve_state_since.as_ref().map_or(false, |x| *x <= req.header.number()),
Pending::Code(ref req, _) =>
self.capabilities.serve_state_since.as_ref().map_or(false, |x| *x <= req.block_id.1),
Pending::TxProof(ref req, _) =>
self.capabilities.serve_state_since.as_ref().map_or(false, |x| *x <= req.header.number()),
}
}
}
// Which portions of a CHT proof should be sent. caps.serve_headers == c.serve_headers &&
enum ChtProofSender { caps.serve_chain_since >= c.serve_chain_since &&
Both(Sender<(H256, U256)>), caps.serve_state_since >= c.serve_chain_since
Hash(Sender<H256>), }
ChainScore(Sender<U256>),
} }
// Attempted request info and sender to put received value. // Attempted request info and sender to put received value.
enum Pending { struct Pending {
HeaderProof(request::HeaderProof, ChtProofSender), requests: basic_request::Requests<CheckedRequest>,
HeaderByHash(request::HeaderByHash, Sender<encoded::Header>), net_requests: basic_request::Requests<NetworkRequest>,
Block(request::Body, Sender<encoded::Block>), required_capabilities: Capabilities,
BlockReceipts(request::BlockReceipts, Sender<Vec<Receipt>>), responses: Vec<Response>,
Account(request::Account, Sender<BasicAccount>), sender: oneshot::Sender<Vec<Response>>,
Code(request::Code, Sender<Bytes>),
TxProof(request::TransactionProof, Sender<Result<Executed, ExecutionError>>),
} }
impl Pending { // helper to guess capabilities required for a given batch of network requests.
// Create a network request. fn guess_capabilities(requests: &[CheckedRequest]) -> Capabilities {
fn make_request(&self) -> NetworkRequest { let mut caps = Capabilities {
match *self { serve_headers: false,
Pending::HeaderByHash(ref req, _) => NetworkRequest::Headers(basic_request::IncompleteHeadersRequest { serve_chain_since: None,
start: basic_request::HashOrNumber::Hash(req.0).into(), serve_state_since: None,
skip: 0, tx_relay: false,
max: 1, };
reverse: false,
}), let update_since = |current: &mut Option<u64>, new|
Pending::HeaderProof(ref req, _) => NetworkRequest::HeaderProof(basic_request::IncompleteHeaderProofRequest { *current = match *current {
num: req.num().into(), Some(x) => Some(::std::cmp::min(x, new)),
}), None => Some(new),
Pending::Block(ref req, _) => NetworkRequest::Body(basic_request::IncompleteBodyRequest { };
hash: req.hash.into(),
}), for request in requests {
Pending::BlockReceipts(ref req, _) => NetworkRequest::Receipts(basic_request::IncompleteReceiptsRequest { match *request {
hash: req.0.hash().into(), // TODO: might be worth returning a required block number for this also.
}), CheckedRequest::HeaderProof(_, _) =>
Pending::Account(ref req, _) => NetworkRequest::Account(basic_request::IncompleteAccountRequest { caps.serve_headers = true,
block_hash: req.header.hash().into(), CheckedRequest::HeaderByHash(_, _) =>
address_hash: ::util::Hashable::sha3(&req.address).into(), caps.serve_headers = true,
}), CheckedRequest::Body(ref req, _) =>
Pending::Code(ref req, _) => NetworkRequest::Code(basic_request::IncompleteCodeRequest { update_since(&mut caps.serve_chain_since, req.header.number()),
block_hash: req.block_id.0.into(), CheckedRequest::Receipts(ref req, _) =>
code_hash: req.code_hash.into(), update_since(&mut caps.serve_chain_since, req.0.number()),
}), CheckedRequest::Account(ref req, _) =>
Pending::TxProof(ref req, _) => NetworkRequest::Execution(basic_request::IncompleteExecutionRequest { update_since(&mut caps.serve_state_since, req.header.number()),
block_hash: req.header.hash().into(), CheckedRequest::Code(ref req, _) =>
from: req.tx.sender(), update_since(&mut caps.serve_state_since, req.block_id.1),
gas: req.tx.gas, CheckedRequest::Execution(ref req, _) =>
gas_price: req.tx.gas_price, update_since(&mut caps.serve_state_since, req.header.number()),
action: req.tx.action.clone(),
value: req.tx.value,
data: req.tx.data.clone(),
}),
} }
} }
caps
}
/// A future extracting the concrete output type of the generic adapter
/// from a vector of responses.
pub struct OnResponses<T: request::RequestAdapter> {
receiver: Receiver<Vec<Response>>,
_marker: PhantomData<T>,
}
impl<T: request::RequestAdapter> Future for OnResponses<T> {
type Item = T::Out;
type Error = Canceled;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.receiver.poll().map(|async| async.map(T::extract_from))
}
} }
/// On demand request service. See module docs for more details. /// On demand request service. See module docs for more details.
/// Accumulates info about all peers' capabilities and dispatches /// Accumulates info about all peers' capabilities and dispatches
/// requests to them accordingly. /// requests to them accordingly.
// lock in declaration order.
pub struct OnDemand { pub struct OnDemand {
pending: RwLock<Vec<Pending>>,
peers: RwLock<HashMap<PeerId, Peer>>, peers: RwLock<HashMap<PeerId, Peer>>,
pending_requests: RwLock<HashMap<ReqId, Pending>>, in_transit: RwLock<HashMap<ReqId, Pending>>,
cache: Arc<Mutex<Cache>>, cache: Arc<Mutex<Cache>>,
orphaned_requests: RwLock<Vec<Pending>>, no_immediate_dispatch: bool,
start_nonce: U256,
} }
const RECEIVER_IN_SCOPE: &'static str = "Receiver is still in scope, so it's not dropped; qed";
impl OnDemand { impl OnDemand {
/// Create a new `OnDemand` service with the given cache. /// Create a new `OnDemand` service with the given cache.
pub fn new(cache: Arc<Mutex<Cache>>, account_start_nonce: U256) -> Self { pub fn new(cache: Arc<Mutex<Cache>>) -> Self {
OnDemand { OnDemand {
pending: RwLock::new(Vec::new()),
peers: RwLock::new(HashMap::new()), peers: RwLock::new(HashMap::new()),
pending_requests: RwLock::new(HashMap::new()), in_transit: RwLock::new(HashMap::new()),
cache: cache, cache: cache,
orphaned_requests: RwLock::new(Vec::new()), no_immediate_dispatch: true,
start_nonce: account_start_nonce,
} }
} }
// make a test version: this doesn't dispatch pending requests
// until you trigger it manually.
#[cfg(test)]
fn new_test(cache: Arc<Mutex<Cache>>) -> Self {
let mut me = OnDemand::new(cache);
me.no_immediate_dispatch = true;
me
}
/// Request a header's hash by block number and CHT root hash. /// Request a header's hash by block number and CHT root hash.
/// Returns the hash. /// Returns the hash.
pub fn hash_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> Receiver<H256> { pub fn hash_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> BoxFuture<H256, Canceled> {
let (sender, receiver) = oneshot::channel();
let cached = { let cached = {
let mut cache = self.cache.lock(); let mut cache = self.cache.lock();
cache.block_hash(&req.num()) cache.block_hash(&req.num())
}; };
match cached { match cached {
Some(hash) => sender.send(hash).expect(RECEIVER_IN_SCOPE), Some(hash) => future::ok(hash).boxed(),
None => self.dispatch(ctx, Pending::HeaderProof(req, ChtProofSender::Hash(sender))), None => {
self.request(ctx, req)
.expect("request given fully fleshed out; qed")
.map(|(h, _)| h)
.boxed()
},
} }
receiver
} }
/// Request a canonical block's chain score. /// Request a canonical block's chain score.
/// Returns the chain score. /// Returns the chain score.
pub fn chain_score_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> Receiver<U256> { pub fn chain_score_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> BoxFuture<U256, Canceled> {
let (sender, receiver) = oneshot::channel();
let cached = { let cached = {
let mut cache = self.cache.lock(); let mut cache = self.cache.lock();
cache.block_hash(&req.num()).and_then(|hash| cache.chain_score(&hash)) cache.block_hash(&req.num()).and_then(|hash| cache.chain_score(&hash))
}; };
match cached { match cached {
Some(score) => sender.send(score).expect(RECEIVER_IN_SCOPE), Some(score) => future::ok(score).boxed(),
None => self.dispatch(ctx, Pending::HeaderProof(req, ChtProofSender::ChainScore(sender))), None => {
self.request(ctx, req)
.expect("request given fully fleshed out; qed")
.map(|(_, s)| s)
.boxed()
},
} }
receiver
} }
/// Request a canonical block's hash and chain score by number. /// Request a canonical block's hash and chain score by number.
/// Returns the hash and chain score. /// Returns the hash and chain score.
pub fn hash_and_score_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> Receiver<(H256, U256)> { pub fn hash_and_score_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> BoxFuture<(H256, U256), Canceled> {
let (sender, receiver) = oneshot::channel();
let cached = { let cached = {
let mut cache = self.cache.lock(); let mut cache = self.cache.lock();
let hash = cache.block_hash(&req.num()); let hash = cache.block_hash(&req.num());
@ -200,31 +214,33 @@ impl OnDemand {
}; };
match cached { match cached {
(Some(hash), Some(score)) => sender.send((hash, score)).expect(RECEIVER_IN_SCOPE), (Some(hash), Some(score)) => future::ok((hash, score)).boxed(),
_ => self.dispatch(ctx, Pending::HeaderProof(req, ChtProofSender::Both(sender))), _ => {
self.request(ctx, req)
.expect("request given fully fleshed out; qed")
.boxed()
},
} }
receiver
} }
/// Request a header by hash. This is less accurate than by-number because we don't know /// Request a header by hash. This is less accurate than by-number because we don't know
/// where in the chain this header lies, and therefore can't find a peer who is supposed to have /// where in the chain this header lies, and therefore can't find a peer who is supposed to have
/// it as easily. /// it as easily.
pub fn header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash) -> Receiver<encoded::Header> { pub fn header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash) -> BoxFuture<encoded::Header, Canceled> {
let (sender, receiver) = oneshot::channel();
match { self.cache.lock().block_header(&req.0) } { match { self.cache.lock().block_header(&req.0) } {
Some(hdr) => sender.send(hdr).expect(RECEIVER_IN_SCOPE), Some(hdr) => future::ok(hdr).boxed(),
None => self.dispatch(ctx, Pending::HeaderByHash(req, sender)), None => {
self.request(ctx, req)
.expect("request given fully fleshed out; qed")
.boxed()
},
} }
receiver
} }
/// Request a block, given its header. Block bodies are requestable by hash only, /// Request a block, given its header. Block bodies are requestable by hash only,
/// and the header is required anyway to verify and complete the block body /// and the header is required anyway to verify and complete the block body
/// -- this just doesn't obscure the network query. /// -- this just doesn't obscure the network query.
pub fn block(&self, ctx: &BasicContext, req: request::Body) -> Receiver<encoded::Block> { pub fn block(&self, ctx: &BasicContext, req: request::Body) -> BoxFuture<encoded::Block, Canceled> {
let (sender, receiver) = oneshot::channel();
// fast path for empty body. // fast path for empty body.
if req.header.transactions_root() == SHA3_NULL_RLP && req.header.uncles_hash() == SHA3_EMPTY_LIST_RLP { if req.header.transactions_root() == SHA3_NULL_RLP && req.header.uncles_hash() == SHA3_EMPTY_LIST_RLP {
let mut stream = RlpStream::new_list(3); let mut stream = RlpStream::new_list(3);
@ -232,7 +248,7 @@ impl OnDemand {
stream.begin_list(0); stream.begin_list(0);
stream.begin_list(0); stream.begin_list(0);
sender.send(encoded::Block::new(stream.out())).expect(RECEIVER_IN_SCOPE); future::ok(encoded::Block::new(stream.out())).boxed()
} else { } else {
match { self.cache.lock().block_body(&req.hash) } { match { self.cache.lock().block_body(&req.hash) } {
Some(body) => { Some(body) => {
@ -242,98 +258,124 @@ impl OnDemand {
stream.append_raw(&body.at(0).as_raw(), 1); stream.append_raw(&body.at(0).as_raw(), 1);
stream.append_raw(&body.at(1).as_raw(), 1); stream.append_raw(&body.at(1).as_raw(), 1);
sender.send(encoded::Block::new(stream.out())).expect(RECEIVER_IN_SCOPE); future::ok(encoded::Block::new(stream.out())).boxed()
}
None => {
self.request(ctx, req)
.expect("request given fully fleshed out; qed")
.boxed()
} }
None => self.dispatch(ctx, Pending::Block(req, sender)),
} }
} }
receiver
} }
/// Request the receipts for a block. The header serves two purposes: /// Request the receipts for a block. The header serves two purposes:
/// provide the block hash to fetch receipts for, and for verification of the receipts root. /// provide the block hash to fetch receipts for, and for verification of the receipts root.
pub fn block_receipts(&self, ctx: &BasicContext, req: request::BlockReceipts) -> Receiver<Vec<Receipt>> { pub fn block_receipts(&self, ctx: &BasicContext, req: request::BlockReceipts) -> BoxFuture<Vec<Receipt>, Canceled> {
let (sender, receiver) = oneshot::channel();
// fast path for empty receipts. // fast path for empty receipts.
if req.0.receipts_root() == SHA3_NULL_RLP { if req.0.receipts_root() == SHA3_NULL_RLP {
sender.send(Vec::new()).expect(RECEIVER_IN_SCOPE); return future::ok(Vec::new()).boxed()
} else {
match { self.cache.lock().block_receipts(&req.0.hash()) } {
Some(receipts) => sender.send(receipts).expect(RECEIVER_IN_SCOPE),
None => self.dispatch(ctx, Pending::BlockReceipts(req, sender)),
}
} }
receiver match { self.cache.lock().block_receipts(&req.0.hash()) } {
Some(receipts) => future::ok(receipts).boxed(),
None => {
self.request(ctx, req)
.expect("request given fully fleshed out; qed")
.boxed()
},
}
} }
/// Request an account by address and block header -- which gives a hash to query and a state root /// Request an account by address and block header -- which gives a hash to query and a state root
/// to verify against. /// to verify against.
pub fn account(&self, ctx: &BasicContext, req: request::Account) -> Receiver<BasicAccount> { /// `None` here means that no account by the queried key exists in the queried state.
let (sender, receiver) = oneshot::channel(); pub fn account(&self, ctx: &BasicContext, req: request::Account) -> BoxFuture<Option<BasicAccount>, Canceled> {
self.dispatch(ctx, Pending::Account(req, sender)); self.request(ctx, req)
receiver .expect("request given fully fleshed out; qed")
.boxed()
} }
/// Request code by address, known code hash, and block header. /// Request code by address, known code hash, and block header.
pub fn code(&self, ctx: &BasicContext, req: request::Code) -> Receiver<Bytes> { pub fn code(&self, ctx: &BasicContext, req: request::Code) -> BoxFuture<Bytes, Canceled> {
let (sender, receiver) = oneshot::channel();
// fast path for no code. // fast path for no code.
if req.code_hash == SHA3_EMPTY { if req.code_hash == SHA3_EMPTY {
sender.send(Vec::new()).expect(RECEIVER_IN_SCOPE) future::ok(Vec::new()).boxed()
} else { } else {
self.dispatch(ctx, Pending::Code(req, sender)); self.request(ctx, req)
.expect("request given fully fleshed out; qed")
.boxed()
} }
receiver
} }
/// Request proof-of-execution for a transaction. /// Request proof-of-execution for a transaction.
pub fn transaction_proof(&self, ctx: &BasicContext, req: request::TransactionProof) -> Receiver<Result<Executed, ExecutionError>> { pub fn transaction_proof(&self, ctx: &BasicContext, req: request::TransactionProof) -> BoxFuture<ExecutionResult, Canceled> {
self.request(ctx, req)
.expect("request given fully fleshed out; qed")
.boxed()
}
/// Submit a vector of requests to be processed together.
///
/// Fails if back-references are not coherent.
/// The returned vector of responses will correspond to the requests exactly.
pub fn request_raw(&self, ctx: &BasicContext, requests: Vec<Request>)
-> Result<Receiver<Vec<Response>>, basic_request::NoSuchOutput>
{
let (sender, receiver) = oneshot::channel(); let (sender, receiver) = oneshot::channel();
self.dispatch(ctx, Pending::TxProof(req, sender)); if requests.is_empty() {
assert!(sender.send(Vec::new()).is_ok(), "receiver still in scope; qed");
receiver return Ok(receiver);
} }
// dispatch the request, with a "suitability" function to filter acceptable peers.
fn dispatch(&self, ctx: &BasicContext, pending: Pending) {
let mut builder = basic_request::RequestBuilder::default(); let mut builder = basic_request::RequestBuilder::default();
builder.push(pending.make_request())
.expect("make_request always returns fully complete request; qed");
let complete = builder.build(); let responses = Vec::with_capacity(requests.len());
for request in requests {
let kind = complete.requests()[0].kind(); builder.push(CheckedRequest::from(request))?;
for (id, peer) in self.peers.read().iter() {
if !peer.can_handle(&pending) { continue }
match ctx.request_from(*id, complete.clone()) {
Ok(req_id) => {
trace!(target: "on_demand", "{}: Assigned {:?} to peer {}",
req_id, kind, id);
self.pending_requests.write().insert(
req_id,
pending,
);
return
} }
Err(net::Error::NoCredits) => {}
Err(e) => let requests = builder.build();
trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), let net_requests = requests.clone().map_requests(|req| req.into_net_request());
let capabilities = guess_capabilities(requests.requests());
self.pending.write().push(Pending {
requests: requests,
net_requests: net_requests,
required_capabilities: capabilities,
responses: responses,
sender: sender,
});
self.attempt_dispatch(ctx);
Ok(receiver)
}
/// Submit a strongly-typed batch of requests.
///
/// Fails if back-reference are not coherent.
pub fn request<T>(&self, ctx: &BasicContext, requests: T) -> Result<OnResponses<T>, basic_request::NoSuchOutput>
where T: request::RequestAdapter
{
self.request_raw(ctx, requests.make_requests()).map(|recv| OnResponses {
receiver: recv,
_marker: PhantomData,
})
}
// maybe dispatch pending requests.
// sometimes
fn attempt_dispatch(&self, ctx: &BasicContext) {
if !self.no_immediate_dispatch {
self.dispatch_pending(ctx)
} }
} }
self.orphaned_requests.write().push(pending); // dispatch pending requests, and discard those for which the corresponding
}
// dispatch orphaned requests, and discard those for which the corresponding
// receiver has been dropped. // receiver has been dropped.
fn dispatch_orphaned(&self, ctx: &BasicContext) { fn dispatch_pending(&self, ctx: &BasicContext) {
// wrapper future for calling `poll_cancel` on our `Senders` to preserve // wrapper future for calling `poll_cancel` on our `Senders` to preserve
// the invariant that it's always within a task. // the invariant that it's always within a task.
struct CheckHangup<'a, T: 'a>(&'a mut Sender<T>); struct CheckHangup<'a, T: 'a>(&'a mut Sender<T>);
@ -356,35 +398,44 @@ impl OnDemand {
CheckHangup(send).wait().expect("CheckHangup always returns ok; qed") CheckHangup(send).wait().expect("CheckHangup always returns ok; qed")
} }
if self.orphaned_requests.read().is_empty() { return } if self.pending.read().is_empty() { return }
let mut pending = self.pending.write();
let to_dispatch = ::std::mem::replace(&mut *self.orphaned_requests.write(), Vec::new()); // iterate over all pending requests, and check them for hang-up.
// then, try and find a peer who can serve it.
let peers = self.peers.read();
*pending = ::std::mem::replace(&mut *pending, Vec::new()).into_iter()
.filter_map(|mut pending| match check_hangup(&mut pending.sender) {
false => Some(pending),
true => None,
})
.filter_map(|pending| {
for (peer_id, peer) in peers.iter() { // .shuffle?
// TODO: see which requests can be answered by the cache?
trace!(target: "on_demand", "Attempting to dispatch {} orphaned requests.", to_dispatch.len()); if !peer.can_fulfill(&pending.required_capabilities) {
for mut orphaned in to_dispatch { continue
let hung_up = match orphaned {
Pending::HeaderProof(_, ref mut sender) => match *sender {
ChtProofSender::Both(ref mut s) => check_hangup(s),
ChtProofSender::Hash(ref mut s) => check_hangup(s),
ChtProofSender::ChainScore(ref mut s) => check_hangup(s),
},
Pending::HeaderByHash(_, ref mut sender) => check_hangup(sender),
Pending::Block(_, ref mut sender) => check_hangup(sender),
Pending::BlockReceipts(_, ref mut sender) => check_hangup(sender),
Pending::Account(_, ref mut sender) => check_hangup(sender),
Pending::Code(_, ref mut sender) => check_hangup(sender),
Pending::TxProof(_, ref mut sender) => check_hangup(sender),
};
if !hung_up { self.dispatch(ctx, orphaned) }
} }
match ctx.request_from(*peer_id, pending.net_requests.clone()) {
Ok(req_id) => {
self.in_transit.write().insert(req_id, pending);
return None
}
Err(net::Error::NoCredits) => {}
Err(e) => debug!(target: "on_demand", "Error dispatching request to peer: {}", e),
}
}
Some(pending)
})
.collect(); // `pending` now contains all requests we couldn't dispatch.
} }
} }
impl Handler for OnDemand { impl Handler for OnDemand {
fn on_connect(&self, ctx: &EventContext, status: &Status, capabilities: &Capabilities) { fn on_connect(&self, ctx: &EventContext, status: &Status, capabilities: &Capabilities) {
self.peers.write().insert(ctx.peer(), Peer { status: status.clone(), capabilities: capabilities.clone() }); self.peers.write().insert(ctx.peer(), Peer { status: status.clone(), capabilities: capabilities.clone() });
self.dispatch_orphaned(ctx.as_basic()); self.attempt_dispatch(ctx.as_basic());
} }
fn on_disconnect(&self, ctx: &EventContext, unfulfilled: &[ReqId]) { fn on_disconnect(&self, ctx: &EventContext, unfulfilled: &[ReqId]) {
@ -392,16 +443,16 @@ impl Handler for OnDemand {
let ctx = ctx.as_basic(); let ctx = ctx.as_basic();
{ {
let mut orphaned = self.orphaned_requests.write(); let mut pending = self.pending.write();
for unfulfilled in unfulfilled { for unfulfilled in unfulfilled {
if let Some(pending) = self.pending_requests.write().remove(unfulfilled) { if let Some(unfulfilled) = self.in_transit.write().remove(unfulfilled) {
trace!(target: "on_demand", "Attempting to reassign dropped request"); trace!(target: "on_demand", "Attempting to reassign dropped request");
orphaned.push(pending); pending.push(unfulfilled);
} }
} }
} }
self.dispatch_orphaned(ctx); self.attempt_dispatch(ctx);
} }
fn on_announcement(&self, ctx: &EventContext, announcement: &Announcement) { fn on_announcement(&self, ctx: &EventContext, announcement: &Announcement) {
@ -413,183 +464,70 @@ impl Handler for OnDemand {
} }
} }
self.dispatch_orphaned(ctx.as_basic()); self.attempt_dispatch(ctx.as_basic());
} }
fn on_responses(&self, ctx: &EventContext, req_id: ReqId, responses: &[basic_request::Response]) { fn on_responses(&self, ctx: &EventContext, req_id: ReqId, responses: &[basic_request::Response]) {
let peer = ctx.peer(); use request::IncompleteRequest;
let req = match self.pending_requests.write().remove(&req_id) {
let mut pending = match self.in_transit.write().remove(&req_id) {
Some(req) => req, Some(req) => req,
None => return, None => return,
}; };
let response = match responses.get(0) { // for each incoming response
Some(response) => response, // 1. ensure verification data filled. (still TODO since on_demand doesn't use back-references yet)
None => { // 2. pending.requests.supply_response
trace!(target: "on_demand", "Ignoring empty response for request {}", req_id); // 3. if extracted on-demand response, keep it for later.
self.dispatch(ctx.as_basic(), req); for response in responses {
match pending.requests.supply_response(&*self.cache, response) {
Ok(response) => {
pending.responses.push(response)
}
Err(e) => {
let peer = ctx.peer();
debug!(target: "on_demand", "Peer {} gave bad response: {:?}", peer, e);
ctx.disable_peer(peer);
break;
}
}
}
pending.requests.fill_unanswered();
if pending.requests.is_complete() {
let _ = pending.sender.send(pending.responses);
return; return;
} }
};
trace!(target: "on_demand", "Handling response for request {}, kind={:?}", req_id, response.kind());
// handle the response appropriately for the request. // update network requests (unless we're done, in which case fulfill the future.)
// all branches which do not return early lead to disabling of the peer let mut builder = basic_request::RequestBuilder::default();
// due to misbehavior. let num_answered = pending.requests.num_answered();
match req { let mut mapping = move |idx| idx - num_answered;
Pending::HeaderProof(req, sender) => {
if let NetworkResponse::HeaderProof(ref response) = *response {
match req.check_response(&response.proof) {
Ok((hash, score)) => {
let mut cache = self.cache.lock();
cache.insert_block_hash(req.num(), hash);
cache.insert_chain_score(hash, score);
match sender { for request in pending.requests.requests().iter().skip(num_answered) {
ChtProofSender::Both(sender) => { let _ = sender.send((hash, score)); } let mut net_req = request.clone().into_net_request();
ChtProofSender::Hash(sender) => { let _ = sender.send(hash); }
ChtProofSender::ChainScore(sender) => { let _ = sender.send(score); }
}
return
}
Err(e) => warn!(target: "on_demand", "Error handling response for header request: {:?}", e),
}
}
}
Pending::HeaderByHash(req, sender) => {
if let NetworkResponse::Headers(ref response) = *response {
if let Some(header) = response.headers.get(0) {
match req.check_response(header) {
Ok(header) => {
self.cache.lock().insert_block_header(req.0, header.clone());
let _ = sender.send(header);
return
}
Err(e) => warn!(target: "on_demand", "Error handling response for header request: {:?}", e),
}
}
}
}
Pending::Block(req, sender) => {
if let NetworkResponse::Body(ref response) = *response {
match req.check_response(&response.body) {
Ok(block) => {
self.cache.lock().insert_block_body(req.hash, response.body.clone());
let _ = sender.send(block);
return
}
Err(e) => warn!(target: "on_demand", "Error handling response for block request: {:?}", e),
}
}
}
Pending::BlockReceipts(req, sender) => {
if let NetworkResponse::Receipts(ref response) = *response {
match req.check_response(&response.receipts) {
Ok(receipts) => {
let hash = req.0.hash();
self.cache.lock().insert_block_receipts(hash, receipts.clone());
let _ = sender.send(receipts);
return
}
Err(e) => warn!(target: "on_demand", "Error handling response for receipts request: {:?}", e),
}
}
}
Pending::Account(req, sender) => {
if let NetworkResponse::Account(ref response) = *response {
match req.check_response(&response.proof) {
Ok(account) => {
let account = account.unwrap_or_else(|| {
BasicAccount {
balance: 0.into(),
nonce: self.start_nonce,
code_hash: SHA3_EMPTY,
storage_root: SHA3_NULL_RLP
}
});
// TODO: validate against request outputs. // all back-references with request index less than `num_answered` have
// needs engine + env info as part of request. // been filled by now. all remaining requests point to nothing earlier
let _ = sender.send(account); // than the next unanswered request.
return net_req.adjust_refs(&mut mapping);
} builder.push(net_req)
Err(e) => warn!(target: "on_demand", "Error handling response for state request: {:?}", e), .expect("all back-references to answered requests have been filled; qed");
}
}
}
Pending::Code(req, sender) => {
if let NetworkResponse::Code(ref response) = *response {
match req.check_response(response.code.as_slice()) {
Ok(()) => {
let _ = sender.send(response.code.clone());
return
}
Err(e) => warn!(target: "on_demand", "Error handling response for code request: {:?}", e),
}
}
}
Pending::TxProof(req, sender) => {
if let NetworkResponse::Execution(ref response) = *response {
match req.check_response(&response.items) {
ProvedExecution::Complete(executed) => {
let _ = sender.send(Ok(executed));
return
}
ProvedExecution::Failed(err) => {
let _ = sender.send(Err(err));
return
}
ProvedExecution::BadProof => warn!(target: "on_demand", "Error handling response for transaction proof request"),
}
}
}
} }
ctx.disable_peer(peer); // update pending fields and re-queue.
let capabilities = guess_capabilities(&pending.requests.requests()[num_answered..]);
pending.net_requests = builder.build();
pending.required_capabilities = capabilities;
self.pending.write().push(pending);
self.attempt_dispatch(ctx.as_basic());
} }
fn tick(&self, ctx: &BasicContext) { fn tick(&self, ctx: &BasicContext) {
self.dispatch_orphaned(ctx) self.attempt_dispatch(ctx)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use cache::Cache;
use net::{Announcement, BasicContext, ReqId, Error as LesError};
use request::Requests;
use network::{PeerId, NodeId};
use time::Duration;
use util::{H256, Mutex};
struct FakeContext;
impl BasicContext for FakeContext {
fn persistent_peer_id(&self, _: PeerId) -> Option<NodeId> { None }
fn request_from(&self, _: PeerId, _: Requests) -> Result<ReqId, LesError> {
unimplemented!()
}
fn make_announcement(&self, _: Announcement) { }
fn disconnect_peer(&self, _: PeerId) { }
fn disable_peer(&self, _: PeerId) { }
}
#[test]
fn detects_hangup() {
let cache = Arc::new(Mutex::new(Cache::new(Default::default(), Duration::hours(6))));
let on_demand = OnDemand::new(cache, 0.into());
let result = on_demand.header_by_hash(&FakeContext, request::HeaderByHash(H256::default()));
assert!(on_demand.orphaned_requests.read().len() == 1);
drop(result);
on_demand.dispatch_orphaned(&FakeContext);
assert!(on_demand.orphaned_requests.read().is_empty());
} }
} }

View File

@ -26,17 +26,374 @@ use ethcore::receipt::Receipt;
use ethcore::state::{self, ProvedExecution}; use ethcore::state::{self, ProvedExecution};
use ethcore::transaction::SignedTransaction; use ethcore::transaction::SignedTransaction;
use request::{self as net_request, IncompleteRequest, Output, OutputKind};
use rlp::{RlpStream, UntrustedRlp}; use rlp::{RlpStream, UntrustedRlp};
use util::{Address, Bytes, DBValue, HashDB, H256, U256}; use util::{Address, Bytes, DBValue, HashDB, Mutex, H256, U256};
use util::memorydb::MemoryDB; use util::memorydb::MemoryDB;
use util::sha3::Hashable; use util::sha3::Hashable;
use util::trie::{Trie, TrieDB, TrieError}; use util::trie::{Trie, TrieDB, TrieError};
const SUPPLIED_MATCHES: &'static str = "supplied responses always match produced requests; enforced by `check_response`; qed";
/// Core unit of the API: submit batches of these to be answered with `Response`s.
#[derive(Clone)]
pub enum Request {
/// A request for a header proof.
HeaderProof(HeaderProof),
/// A request for a header by hash.
HeaderByHash(HeaderByHash),
/// A request for block receipts.
Receipts(BlockReceipts),
/// A request for a block body.
Body(Body),
/// A request for an account.
Account(Account),
/// A request for a contract's code.
Code(Code),
/// A request for proof of execution.
Execution(TransactionProof),
}
/// A request argument.
pub trait RequestArg {
/// the response type.
type Out;
/// Create the request type.
/// `extract` must not fail when presented with the corresponding
/// `Response`.
fn make(self) -> Request;
/// May not panic if the response corresponds with the request
/// from `make`.
/// Is free to panic otherwise.
fn extract(r: Response) -> Self::Out;
}
/// An adapter can be thought of as a grouping of request argument types.
/// This is implemented for various tuples and convenient types.
pub trait RequestAdapter {
/// The output type.
type Out;
/// Infallibly produce requests. When `extract_from` is presented
/// with the corresponding response vector, it may not fail.
fn make_requests(self) -> Vec<Request>;
/// Extract the output type from the given responses.
/// If they are the corresponding responses to the requests
/// made by `make_requests`, do not panic.
fn extract_from(Vec<Response>) -> Self::Out;
}
// helper to implement `RequestArg` and `From` for a single request kind.
macro_rules! impl_single {
($variant: ident, $me: ty, $out: ty) => {
impl RequestArg for $me {
type Out = $out;
fn make(self) -> Request {
Request::$variant(self)
}
fn extract(r: Response) -> $out {
match r {
Response::$variant(x) => x,
_ => panic!(SUPPLIED_MATCHES),
}
}
}
impl From<$me> for Request {
fn from(me: $me) -> Request {
Request::$variant(me)
}
}
}
}
// implement traits for each kind of request.
impl_single!(HeaderProof, HeaderProof, (H256, U256));
impl_single!(HeaderByHash, HeaderByHash, encoded::Header);
impl_single!(Receipts, BlockReceipts, Vec<Receipt>);
impl_single!(Body, Body, encoded::Block);
impl_single!(Account, Account, Option<BasicAccount>);
impl_single!(Code, Code, Bytes);
impl_single!(Execution, TransactionProof, super::ExecutionResult);
macro_rules! impl_args {
() => {
impl<T: RequestArg> RequestAdapter for T {
type Out = T::Out;
fn make_requests(self) -> Vec<Request> {
vec![self.make()]
}
fn extract_from(mut responses: Vec<Response>) -> Self::Out {
T::extract(responses.pop().expect(SUPPLIED_MATCHES))
}
}
};
($first: ident, $($next: ident,)*) => {
impl<
$first: RequestArg,
$($next: RequestArg,)*
>
RequestAdapter for ($first, $($next,)*) {
type Out = ($first::Out, $($next::Out,)*);
fn make_requests(self) -> Vec<Request> {
let ($first, $($next,)*) = self;
vec![
$first.make(),
$($next.make(),)*
]
}
fn extract_from(responses: Vec<Response>) -> Self::Out {
let mut iter = responses.into_iter();
(
$first::extract(iter.next().expect(SUPPLIED_MATCHES)),
$($next::extract(iter.next().expect(SUPPLIED_MATCHES)),)*
)
}
}
impl_args!($($next,)*);
}
}
mod impls {
#![allow(non_snake_case)]
use super::{RequestAdapter, RequestArg, Request, Response, SUPPLIED_MATCHES};
impl_args!(A, B, C, D, E, F, G, H, I, J, K, L,);
}
/// Requests coupled with their required data for verification.
/// This is used internally but not part of the public API.
#[derive(Clone)]
#[allow(missing_docs)]
pub enum CheckedRequest {
HeaderProof(HeaderProof, net_request::IncompleteHeaderProofRequest),
HeaderByHash(HeaderByHash, net_request::IncompleteHeadersRequest),
Receipts(BlockReceipts, net_request::IncompleteReceiptsRequest),
Body(Body, net_request::IncompleteBodyRequest),
Account(Account, net_request::IncompleteAccountRequest),
Code(Code, net_request::IncompleteCodeRequest),
Execution(TransactionProof, net_request::IncompleteExecutionRequest),
}
impl From<Request> for CheckedRequest {
fn from(req: Request) -> Self {
match req {
Request::HeaderByHash(req) => {
let net_req = net_request::IncompleteHeadersRequest {
start: net_request::HashOrNumber::Hash(req.0).into(),
skip: 0,
max: 1,
reverse: false,
};
CheckedRequest::HeaderByHash(req, net_req)
}
Request::HeaderProof(req) => {
let net_req = net_request::IncompleteHeaderProofRequest {
num: req.num().into(),
};
CheckedRequest::HeaderProof(req, net_req)
}
Request::Body(req) => {
let net_req = net_request::IncompleteBodyRequest {
hash: req.hash.into(),
};
CheckedRequest::Body(req, net_req)
}
Request::Receipts(req) => {
let net_req = net_request::IncompleteReceiptsRequest {
hash: req.0.hash().into(),
};
CheckedRequest::Receipts(req, net_req)
}
Request::Account(req) => {
let net_req = net_request::IncompleteAccountRequest {
block_hash: req.header.hash().into(),
address_hash: ::util::Hashable::sha3(&req.address).into(),
};
CheckedRequest::Account(req, net_req)
}
Request::Code(req) => {
let net_req = net_request::IncompleteCodeRequest {
block_hash: req.block_id.0.into(),
code_hash: req.code_hash.into(),
};
CheckedRequest::Code(req, net_req)
}
Request::Execution(req) => {
let net_req = net_request::IncompleteExecutionRequest {
block_hash: req.header.hash().into(),
from: req.tx.sender(),
gas: req.tx.gas,
gas_price: req.tx.gas_price,
action: req.tx.action.clone(),
value: req.tx.value,
data: req.tx.data.clone(),
};
CheckedRequest::Execution(req, net_req)
}
}
}
}
impl CheckedRequest {
/// Convert this into a network request.
pub fn into_net_request(self) -> net_request::Request {
use ::request::Request as NetRequest;
match self {
CheckedRequest::HeaderProof(_, req) => NetRequest::HeaderProof(req),
CheckedRequest::HeaderByHash(_, req) => NetRequest::Headers(req),
CheckedRequest::Receipts(_, req) => NetRequest::Receipts(req),
CheckedRequest::Body(_, req) => NetRequest::Body(req),
CheckedRequest::Account(_, req) => NetRequest::Account(req),
CheckedRequest::Code(_, req) => NetRequest::Code(req),
CheckedRequest::Execution(_, req) => NetRequest::Execution(req),
}
}
}
macro_rules! match_me {
($me: expr, ($check: pat, $req: pat) => $e: expr) => {
match $me {
CheckedRequest::HeaderProof($check, $req) => $e,
CheckedRequest::HeaderByHash($check, $req) => $e,
CheckedRequest::Receipts($check, $req) => $e,
CheckedRequest::Body($check, $req) => $e,
CheckedRequest::Account($check, $req) => $e,
CheckedRequest::Code($check, $req) => $e,
CheckedRequest::Execution($check, $req) => $e,
}
}
}
impl IncompleteRequest for CheckedRequest {
type Complete = net_request::CompleteRequest;
type Response = net_request::Response;
/// Check prior outputs against the needed inputs.
///
/// This is called to ensure consistency of this request with
/// others in the same packet.
fn check_outputs<F>(&self, f: F) -> Result<(), net_request::NoSuchOutput>
where F: FnMut(usize, usize, OutputKind) -> Result<(), net_request::NoSuchOutput>
{
match_me!(*self, (_, ref req) => req.check_outputs(f))
}
/// Note that this request will produce the following outputs.
fn note_outputs<F>(&self, f: F) where F: FnMut(usize, OutputKind) {
match_me!(*self, (_, ref req) => req.note_outputs(f))
}
/// Fill fields of the request.
///
/// This function is provided an "output oracle" which allows fetching of
/// prior request outputs.
/// Only outputs previously checked with `check_outputs` may be available.
fn fill<F>(&mut self, f: F) where F: Fn(usize, usize) -> Result<Output, net_request::NoSuchOutput> {
match_me!(*self, (_, ref mut req) => req.fill(f))
}
/// Will succeed if all fields have been filled, will fail otherwise.
fn complete(self) -> Result<Self::Complete, net_request::NoSuchOutput> {
use ::request::CompleteRequest;
match self {
CheckedRequest::HeaderProof(_, req) => req.complete().map(CompleteRequest::HeaderProof),
CheckedRequest::HeaderByHash(_, req) => req.complete().map(CompleteRequest::Headers),
CheckedRequest::Receipts(_, req) => req.complete().map(CompleteRequest::Receipts),
CheckedRequest::Body(_, req) => req.complete().map(CompleteRequest::Body),
CheckedRequest::Account(_, req) => req.complete().map(CompleteRequest::Account),
CheckedRequest::Code(_, req) => req.complete().map(CompleteRequest::Code),
CheckedRequest::Execution(_, req) => req.complete().map(CompleteRequest::Execution),
}
}
fn adjust_refs<F>(&mut self, mapping: F) where F: FnMut(usize) -> usize {
match_me!(*self, (_, ref mut req) => req.adjust_refs(mapping))
}
}
impl net_request::CheckedRequest for CheckedRequest {
type Extract = Response;
type Error = Error;
type Environment = Mutex<::cache::Cache>;
/// Check whether the response matches (beyond the type).
fn check_response(&self, cache: &Mutex<::cache::Cache>, response: &Self::Response) -> Result<Response, Error> {
use ::request::Response as NetResponse;
// helper for expecting a specific response for a given request.
macro_rules! expect {
($res: pat => $e: expr) => {
match *response {
$res => $e,
_ => Err(Error::WrongKind),
}
}
}
// check response against contained prover.
match *self {
CheckedRequest::HeaderProof(ref prover, _) => expect!(NetResponse::HeaderProof(ref res) =>
prover.check_response(cache, &res.proof).map(Response::HeaderProof)),
CheckedRequest::HeaderByHash(ref prover, _) => expect!(NetResponse::Headers(ref res) =>
prover.check_response(cache, &res.headers).map(Response::HeaderByHash)),
CheckedRequest::Receipts(ref prover, _) => expect!(NetResponse::Receipts(ref res) =>
prover.check_response(cache, &res.receipts).map(Response::Receipts)),
CheckedRequest::Body(ref prover, _) => expect!(NetResponse::Body(ref res) =>
prover.check_response(cache, &res.body).map(Response::Body)),
CheckedRequest::Account(ref prover, _) => expect!(NetResponse::Account(ref res) =>
prover.check_response(cache, &res.proof).map(Response::Account)),
CheckedRequest::Code(ref prover, _) => expect!(NetResponse::Code(ref res) =>
prover.check_response(cache, &res.code).map(Response::Code)),
CheckedRequest::Execution(ref prover, _) => expect!(NetResponse::Execution(ref res) =>
prover.check_response(cache, &res.items).map(Response::Execution)),
}
}
}
/// Responses to on-demand requests.
/// All of these are checked.
pub enum Response {
/// Response to a header proof request.
/// Returns the hash and chain score.
HeaderProof((H256, U256)),
/// Response to a header-by-hash request.
HeaderByHash(encoded::Header),
/// Response to a receipts request.
Receipts(Vec<Receipt>),
/// Response to a block body request.
Body(encoded::Block),
/// Response to an Account request.
// TODO: `unwrap_or(engine_defaults)`
Account(Option<BasicAccount>),
/// Response to a request for code.
Code(Vec<u8>),
/// Response to a request for proved execution.
Execution(super::ExecutionResult),
}
/// Errors in verification. /// Errors in verification.
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub enum Error { pub enum Error {
/// RLP decoder error. /// RLP decoder error.
Decoder(::rlp::DecoderError), Decoder(::rlp::DecoderError),
/// Empty response.
Empty,
/// Trie lookup error (result of bad proof) /// Trie lookup error (result of bad proof)
Trie(TrieError), Trie(TrieError),
/// Bad inclusion proof /// Bad inclusion proof
@ -47,6 +404,8 @@ pub enum Error {
WrongHash(H256, H256), WrongHash(H256, H256),
/// Wrong trie root. /// Wrong trie root.
WrongTrieRoot(H256, H256), WrongTrieRoot(H256, H256),
/// Wrong response kind.
WrongKind,
} }
impl From<::rlp::DecoderError> for Error { impl From<::rlp::DecoderError> for Error {
@ -93,9 +452,15 @@ impl HeaderProof {
pub fn cht_root(&self) -> H256 { self.cht_root } pub fn cht_root(&self) -> H256 { self.cht_root }
/// Check a response with a CHT proof, get a hash and total difficulty back. /// Check a response with a CHT proof, get a hash and total difficulty back.
pub fn check_response(&self, proof: &[Bytes]) -> Result<(H256, U256), Error> { pub fn check_response(&self, cache: &Mutex<::cache::Cache>, proof: &[Bytes]) -> Result<(H256, U256), Error> {
match ::cht::check_proof(proof, self.num, self.cht_root) { match ::cht::check_proof(proof, self.num, self.cht_root) {
Some((expected_hash, td)) => Ok((expected_hash, td)), Some((expected_hash, td)) => {
let mut cache = cache.lock();
cache.insert_block_hash(self.num, expected_hash);
cache.insert_chain_score(expected_hash, td);
Ok((expected_hash, td))
}
None => Err(Error::BadProof), None => Err(Error::BadProof),
} }
} }
@ -107,10 +472,14 @@ pub struct HeaderByHash(pub H256);
impl HeaderByHash { impl HeaderByHash {
/// Check a response for the header. /// Check a response for the header.
pub fn check_response(&self, header: &encoded::Header) -> Result<encoded::Header, Error> { pub fn check_response(&self, cache: &Mutex<::cache::Cache>, headers: &[encoded::Header]) -> Result<encoded::Header, Error> {
let header = headers.get(0).ok_or(Error::Empty)?;
let hash = header.sha3(); let hash = header.sha3();
match hash == self.0 { match hash == self.0 {
true => Ok(header.clone()), true => {
cache.lock().insert_block_header(hash, header.clone());
Ok(header.clone())
}
false => Err(Error::WrongHash(self.0, hash)), false => Err(Error::WrongHash(self.0, hash)),
} }
} }
@ -136,7 +505,7 @@ impl Body {
} }
/// Check a response for this block body. /// Check a response for this block body.
pub fn check_response(&self, body: &encoded::Body) -> Result<encoded::Block, Error> { pub fn check_response(&self, cache: &Mutex<::cache::Cache>, body: &encoded::Body) -> Result<encoded::Block, Error> {
// check the integrity of the the body against the header // check the integrity of the the body against the header
let tx_root = ::util::triehash::ordered_trie_root(body.rlp().at(0).iter().map(|r| r.as_raw().to_vec())); let tx_root = ::util::triehash::ordered_trie_root(body.rlp().at(0).iter().map(|r| r.as_raw().to_vec()));
if tx_root != self.header.transactions_root() { if tx_root != self.header.transactions_root() {
@ -154,6 +523,8 @@ impl Body {
stream.append_raw(body.rlp().at(0).as_raw(), 1); stream.append_raw(body.rlp().at(0).as_raw(), 1);
stream.append_raw(body.rlp().at(1).as_raw(), 1); stream.append_raw(body.rlp().at(1).as_raw(), 1);
cache.lock().insert_block_body(self.hash, body.clone());
Ok(encoded::Block::new(stream.out())) Ok(encoded::Block::new(stream.out()))
} }
} }
@ -164,12 +535,15 @@ pub struct BlockReceipts(pub encoded::Header);
impl BlockReceipts { impl BlockReceipts {
/// Check a response with receipts against the stored header. /// Check a response with receipts against the stored header.
pub fn check_response(&self, receipts: &[Receipt]) -> Result<Vec<Receipt>, Error> { pub fn check_response(&self, cache: &Mutex<::cache::Cache>, receipts: &[Receipt]) -> Result<Vec<Receipt>, Error> {
let receipts_root = self.0.receipts_root(); let receipts_root = self.0.receipts_root();
let found_root = ::util::triehash::ordered_trie_root(receipts.iter().map(|r| ::rlp::encode(r).to_vec())); let found_root = ::util::triehash::ordered_trie_root(receipts.iter().map(|r| ::rlp::encode(r).to_vec()));
match receipts_root == found_root { match receipts_root == found_root {
true => Ok(receipts.to_vec()), true => {
cache.lock().insert_block_receipts(receipts_root, receipts.to_vec());
Ok(receipts.to_vec())
}
false => Err(Error::WrongTrieRoot(receipts_root, found_root)), false => Err(Error::WrongTrieRoot(receipts_root, found_root)),
} }
} }
@ -186,7 +560,7 @@ pub struct Account {
impl Account { impl Account {
/// Check a response with an account against the stored header. /// Check a response with an account against the stored header.
pub fn check_response(&self, proof: &[Bytes]) -> Result<Option<BasicAccount>, Error> { pub fn check_response(&self, _: &Mutex<::cache::Cache>, proof: &[Bytes]) -> Result<Option<BasicAccount>, Error> {
let state_root = self.header.state_root(); let state_root = self.header.state_root();
let mut db = MemoryDB::new(); let mut db = MemoryDB::new();
@ -208,6 +582,7 @@ impl Account {
} }
/// Request for account code. /// Request for account code.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Code { pub struct Code {
/// Block hash, number pair. /// Block hash, number pair.
pub block_id: (H256, u64), pub block_id: (H256, u64),
@ -217,10 +592,10 @@ pub struct Code {
impl Code { impl Code {
/// Check a response with code against the code hash. /// Check a response with code against the code hash.
pub fn check_response(&self, code: &[u8]) -> Result<(), Error> { pub fn check_response(&self, _: &Mutex<::cache::Cache>, code: &[u8]) -> Result<Vec<u8>, Error> {
let found_hash = code.sha3(); let found_hash = code.sha3();
if found_hash == self.code_hash { if found_hash == self.code_hash {
Ok(()) Ok(code.to_vec())
} else { } else {
Err(Error::WrongHash(self.code_hash, found_hash)) Err(Error::WrongHash(self.code_hash, found_hash))
} }
@ -228,6 +603,7 @@ impl Code {
} }
/// Request for transaction execution, along with the parts necessary to verify the proof. /// Request for transaction execution, along with the parts necessary to verify the proof.
#[derive(Clone)]
pub struct TransactionProof { pub struct TransactionProof {
/// The transaction to request proof of. /// The transaction to request proof of.
pub tx: SignedTransaction, pub tx: SignedTransaction,
@ -241,25 +617,32 @@ pub struct TransactionProof {
impl TransactionProof { impl TransactionProof {
/// Check the proof, returning the proved execution or indicate that the proof was bad. /// Check the proof, returning the proved execution or indicate that the proof was bad.
pub fn check_response(&self, state_items: &[DBValue]) -> ProvedExecution { pub fn check_response(&self, _: &Mutex<::cache::Cache>, state_items: &[DBValue]) -> Result<super::ExecutionResult, Error> {
let root = self.header.state_root(); let root = self.header.state_root();
let mut env_info = self.env_info.clone(); let mut env_info = self.env_info.clone();
env_info.gas_limit = self.tx.gas.clone(); env_info.gas_limit = self.tx.gas.clone();
state::check_proof(
let proved_execution = state::check_proof(
state_items, state_items,
root, root,
&self.tx, &self.tx,
&*self.engine, &*self.engine,
&env_info, &self.env_info,
) );
match proved_execution {
ProvedExecution::BadProof => Err(Error::BadProof),
ProvedExecution::Failed(e) => Ok(Err(e)),
ProvedExecution::Complete(e) => Ok(Ok(e)),
}
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use util::{MemoryDB, Address, H256}; use util::{MemoryDB, Address, Mutex, H256};
use util::trie::{Trie, TrieMut, SecTrieDB, SecTrieDBMut}; use util::trie::{Trie, TrieMut, SecTrieDB, SecTrieDBMut};
use util::trie::recorder::Recorder; use util::trie::recorder::Recorder;
@ -268,6 +651,10 @@ mod tests {
use ethcore::encoded; use ethcore::encoded;
use ethcore::receipt::Receipt; use ethcore::receipt::Receipt;
fn make_cache() -> ::cache::Cache {
::cache::Cache::new(Default::default(), ::time::Duration::seconds(1))
}
#[test] #[test]
fn no_invalid_header_by_number() { fn no_invalid_header_by_number() {
assert!(HeaderProof::new(0, Default::default()).is_none()) assert!(HeaderProof::new(0, Default::default()).is_none())
@ -297,7 +684,8 @@ mod tests {
let proof = cht.prove(10_000, 0).unwrap().unwrap(); let proof = cht.prove(10_000, 0).unwrap().unwrap();
let req = HeaderProof::new(10_000, cht.root()).unwrap(); let req = HeaderProof::new(10_000, cht.root()).unwrap();
assert!(req.check_response(&proof[..]).is_ok()); let cache = Mutex::new(make_cache());
assert!(req.check_response(&cache, &proof[..]).is_ok());
} }
#[test] #[test]
@ -308,7 +696,8 @@ mod tests {
let hash = header.hash(); let hash = header.hash();
let raw_header = encoded::Header::new(::rlp::encode(&header).to_vec()); let raw_header = encoded::Header::new(::rlp::encode(&header).to_vec());
assert!(HeaderByHash(hash).check_response(&raw_header).is_ok()) let cache = Mutex::new(make_cache());
assert!(HeaderByHash(hash).check_response(&cache, &[raw_header]).is_ok())
} }
#[test] #[test]
@ -324,8 +713,9 @@ mod tests {
hash: header.hash(), hash: header.hash(),
}; };
let cache = Mutex::new(make_cache());
let response = encoded::Body::new(body_stream.drain().to_vec()); let response = encoded::Body::new(body_stream.drain().to_vec());
assert!(req.check_response(&response).is_ok()) assert!(req.check_response(&cache, &response).is_ok())
} }
#[test] #[test]
@ -346,7 +736,8 @@ mod tests {
let req = BlockReceipts(encoded::Header::new(::rlp::encode(&header).to_vec())); let req = BlockReceipts(encoded::Header::new(::rlp::encode(&header).to_vec()));
assert!(req.check_response(&receipts).is_ok()) let cache = Mutex::new(make_cache());
assert!(req.check_response(&cache, &receipts).is_ok())
} }
#[test] #[test]
@ -395,7 +786,8 @@ mod tests {
address: addr, address: addr,
}; };
assert!(req.check_response(&proof[..]).is_ok()); let cache = Mutex::new(make_cache());
assert!(req.check_response(&cache, &proof[..]).is_ok());
} }
#[test] #[test]
@ -406,7 +798,8 @@ mod tests {
code_hash: ::util::Hashable::sha3(&code), code_hash: ::util::Hashable::sha3(&code),
}; };
assert!(req.check_response(&code).is_ok()); let cache = Mutex::new(make_cache());
assert!(req.check_response(&[]).is_err()); assert!(req.check_response(&cache, &code).is_ok());
assert!(req.check_response(&cache, &[]).is_err());
} }
} }

View File

@ -0,0 +1,397 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Tests for the on-demand service.
use cache::Cache;
use ethcore::encoded;
use ethcore::header::{Header, Seal};
use futures::Future;
use network::{PeerId, NodeId};
use net::*;
use util::{H256, Mutex};
use time::Duration;
use ::request::{self as basic_request, Response};
use std::sync::Arc;
use super::{request, OnDemand, Peer};
// useful contexts to give the service.
enum Context {
NoOp,
WithPeer(PeerId),
RequestFrom(PeerId, ReqId),
Punish(PeerId),
}
impl EventContext for Context {
fn peer(&self) -> PeerId {
match *self {
Context::WithPeer(id)
| Context::RequestFrom(id, _)
| Context::Punish(id) => id,
_ => panic!("didn't expect to have peer queried."),
}
}
fn as_basic(&self) -> &BasicContext { self }
}
impl BasicContext for Context {
/// Returns the relevant's peer persistent Id (aka NodeId).
fn persistent_peer_id(&self, _: PeerId) -> Option<NodeId> {
panic!("didn't expect to provide persistent ID")
}
fn request_from(&self, peer_id: PeerId, _: ::request::NetworkRequests) -> Result<ReqId, Error> {
match *self {
Context::RequestFrom(id, req_id) => if peer_id == id { Ok(req_id) } else { Err(Error::NoCredits) },
_ => panic!("didn't expect to have requests dispatched."),
}
}
fn make_announcement(&self, _: Announcement) {
panic!("didn't expect to make announcement")
}
fn disconnect_peer(&self, id: PeerId) {
self.disable_peer(id)
}
fn disable_peer(&self, peer_id: PeerId) {
match *self {
Context::Punish(id) if id == peer_id => {},
_ => panic!("Unexpectedly punished peer."),
}
}
}
// test harness.
struct Harness {
service: OnDemand,
}
impl Harness {
fn create() -> Self {
let cache = Arc::new(Mutex::new(Cache::new(Default::default(), Duration::minutes(1))));
Harness {
service: OnDemand::new_test(cache),
}
}
fn inject_peer(&self, id: PeerId, peer: Peer) {
self.service.peers.write().insert(id, peer);
}
}
fn dummy_status() -> Status {
Status {
protocol_version: 1,
network_id: 999,
head_td: 1.into(),
head_hash: H256::default(),
head_num: 1359,
genesis_hash: H256::default(),
last_head: None,
}
}
fn dummy_capabilities() -> Capabilities {
Capabilities {
serve_headers: true,
serve_chain_since: Some(1),
serve_state_since: Some(1),
tx_relay: true,
}
}
#[test]
fn detects_hangup() {
let on_demand = Harness::create().service;
let result = on_demand.header_by_hash(&Context::NoOp, request::HeaderByHash(H256::default()));
assert_eq!(on_demand.pending.read().len(), 1);
drop(result);
on_demand.dispatch_pending(&Context::NoOp);
assert!(on_demand.pending.read().is_empty());
}
#[test]
fn single_request() {
let harness = Harness::create();
let peer_id = 10101;
let req_id = ReqId(14426);
harness.inject_peer(peer_id, Peer {
status: dummy_status(),
capabilities: dummy_capabilities(),
});
let header = Header::default();
let encoded = encoded::Header::new(header.rlp(Seal::With));
let recv = harness.service.request_raw(
&Context::NoOp,
vec![request::HeaderByHash(header.hash()).into()]
).unwrap();
assert_eq!(harness.service.pending.read().len(), 1);
harness.service.dispatch_pending(&Context::RequestFrom(peer_id, req_id));
assert_eq!(harness.service.pending.read().len(), 0);
harness.service.on_responses(
&Context::WithPeer(peer_id),
req_id,
&[Response::Headers(basic_request::HeadersResponse { headers: vec![encoded] })]
);
assert!(recv.wait().is_ok());
}
#[test]
fn no_capabilities() {
let harness = Harness::create();
let peer_id = 10101;
let mut capabilities = dummy_capabilities();
capabilities.serve_headers = false;
harness.inject_peer(peer_id, Peer {
status: dummy_status(),
capabilities: capabilities,
});
let _recv = harness.service.request_raw(
&Context::NoOp,
vec![request::HeaderByHash(Default::default()).into()]
).unwrap();
assert_eq!(harness.service.pending.read().len(), 1);
harness.service.dispatch_pending(&Context::NoOp);
assert_eq!(harness.service.pending.read().len(), 1);
}
#[test]
fn reassign() {
let harness = Harness::create();
let peer_ids = (10101, 12345);
let req_ids = (ReqId(14426), ReqId(555));
harness.inject_peer(peer_ids.0, Peer {
status: dummy_status(),
capabilities: dummy_capabilities(),
});
let header = Header::default();
let encoded = encoded::Header::new(header.rlp(Seal::With));
let recv = harness.service.request_raw(
&Context::NoOp,
vec![request::HeaderByHash(header.hash()).into()]
).unwrap();
assert_eq!(harness.service.pending.read().len(), 1);
harness.service.dispatch_pending(&Context::RequestFrom(peer_ids.0, req_ids.0));
assert_eq!(harness.service.pending.read().len(), 0);
harness.service.on_disconnect(&Context::WithPeer(peer_ids.0), &[req_ids.0]);
assert_eq!(harness.service.pending.read().len(), 1);
harness.inject_peer(peer_ids.1, Peer {
status: dummy_status(),
capabilities: dummy_capabilities(),
});
harness.service.dispatch_pending(&Context::RequestFrom(peer_ids.1, req_ids.1));
assert_eq!(harness.service.pending.read().len(), 0);
harness.service.on_responses(
&Context::WithPeer(peer_ids.1),
req_ids.1,
&[Response::Headers(basic_request::HeadersResponse { headers: vec![encoded] })]
);
assert!(recv.wait().is_ok());
}
#[test]
fn partial_response() {
let harness = Harness::create();
let peer_id = 111;
let req_ids = (ReqId(14426), ReqId(555));
harness.inject_peer(peer_id, Peer {
status: dummy_status(),
capabilities: dummy_capabilities(),
});
let make = |num| {
let mut hdr = Header::default();
hdr.set_number(num);
let encoded = encoded::Header::new(hdr.rlp(Seal::With));
(hdr, encoded)
};
let (header1, encoded1) = make(5);
let (header2, encoded2) = make(23452);
// request two headers.
let recv = harness.service.request_raw(
&Context::NoOp,
vec![
request::HeaderByHash(header1.hash()).into(),
request::HeaderByHash(header2.hash()).into(),
],
).unwrap();
assert_eq!(harness.service.pending.read().len(), 1);
harness.service.dispatch_pending(&Context::RequestFrom(peer_id, req_ids.0));
assert_eq!(harness.service.pending.read().len(), 0);
// supply only the first one.
harness.service.on_responses(
&Context::WithPeer(peer_id),
req_ids.0,
&[Response::Headers(basic_request::HeadersResponse { headers: vec![encoded1] })]
);
assert_eq!(harness.service.pending.read().len(), 1);
harness.service.dispatch_pending(&Context::RequestFrom(peer_id, req_ids.1));
assert_eq!(harness.service.pending.read().len(), 0);
// supply the next one.
harness.service.on_responses(
&Context::WithPeer(peer_id),
req_ids.1,
&[Response::Headers(basic_request::HeadersResponse { headers: vec![encoded2] })]
);
assert!(recv.wait().is_ok());
}
#[test]
fn part_bad_part_good() {
let harness = Harness::create();
let peer_id = 111;
let req_ids = (ReqId(14426), ReqId(555));
harness.inject_peer(peer_id, Peer {
status: dummy_status(),
capabilities: dummy_capabilities(),
});
let make = |num| {
let mut hdr = Header::default();
hdr.set_number(num);
let encoded = encoded::Header::new(hdr.rlp(Seal::With));
(hdr, encoded)
};
let (header1, encoded1) = make(5);
let (header2, encoded2) = make(23452);
// request two headers.
let recv = harness.service.request_raw(
&Context::NoOp,
vec![
request::HeaderByHash(header1.hash()).into(),
request::HeaderByHash(header2.hash()).into(),
],
).unwrap();
assert_eq!(harness.service.pending.read().len(), 1);
harness.service.dispatch_pending(&Context::RequestFrom(peer_id, req_ids.0));
assert_eq!(harness.service.pending.read().len(), 0);
// supply only the first one, but followed by the wrong kind of response.
// the first header should be processed.
harness.service.on_responses(
&Context::Punish(peer_id),
req_ids.0,
&[
Response::Headers(basic_request::HeadersResponse { headers: vec![encoded1] }),
Response::Receipts(basic_request::ReceiptsResponse { receipts: vec![] } ),
]
);
assert_eq!(harness.service.pending.read().len(), 1);
harness.inject_peer(peer_id, Peer {
status: dummy_status(),
capabilities: dummy_capabilities(),
});
harness.service.dispatch_pending(&Context::RequestFrom(peer_id, req_ids.1));
assert_eq!(harness.service.pending.read().len(), 0);
// supply the next one.
harness.service.on_responses(
&Context::WithPeer(peer_id),
req_ids.1,
&[Response::Headers(basic_request::HeadersResponse { headers: vec![encoded2] })]
);
assert!(recv.wait().is_ok());
}
#[test]
fn wrong_kind() {
let harness = Harness::create();
let peer_id = 10101;
let req_id = ReqId(14426);
harness.inject_peer(peer_id, Peer {
status: dummy_status(),
capabilities: dummy_capabilities(),
});
let _recv = harness.service.request_raw(
&Context::NoOp,
vec![request::HeaderByHash(Default::default()).into()]
).unwrap();
assert_eq!(harness.service.pending.read().len(), 1);
harness.service.dispatch_pending(&Context::RequestFrom(peer_id, req_id));
assert_eq!(harness.service.pending.read().len(), 0);
harness.service.on_responses(
&Context::Punish(peer_id),
req_id,
&[Response::Receipts(basic_request::ReceiptsResponse { receipts: vec![] })]
);
assert_eq!(harness.service.pending.read().len(), 1);
}

View File

@ -20,22 +20,30 @@
use std::collections::HashMap; use std::collections::HashMap;
use request::{ use request::{
IncompleteRequest, CompleteRequest, Request, IncompleteRequest, OutputKind, Output, NoSuchOutput, ResponseError, ResponseLike,
OutputKind, Output, NoSuchOutput, Response, ResponseError,
}; };
/// Build chained requests. Push them onto the series with `push`, /// Build chained requests. Push them onto the series with `push`,
/// and produce a `Requests` object with `build`. Outputs are checked for consistency. /// and produce a `Requests` object with `build`. Outputs are checked for consistency.
#[derive(Debug, Default, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub struct RequestBuilder { pub struct RequestBuilder<T> {
output_kinds: HashMap<(usize, usize), OutputKind>, output_kinds: HashMap<(usize, usize), OutputKind>,
requests: Vec<Request>, requests: Vec<T>,
} }
impl RequestBuilder { impl<T> Default for RequestBuilder<T> {
fn default() -> Self {
RequestBuilder {
output_kinds: HashMap::new(),
requests: Vec::new(),
}
}
}
impl<T: IncompleteRequest> RequestBuilder<T> {
/// Attempt to push a request onto the request chain. Fails if the request /// Attempt to push a request onto the request chain. Fails if the request
/// references a non-existent output of a prior request. /// references a non-existent output of a prior request.
pub fn push(&mut self, request: Request) -> Result<(), NoSuchOutput> { pub fn push(&mut self, request: T) -> Result<(), NoSuchOutput> {
request.check_outputs(|req, idx, kind| { request.check_outputs(|req, idx, kind| {
match self.output_kinds.get(&(req, idx)) { match self.output_kinds.get(&(req, idx)) {
Some(k) if k == &kind => Ok(()), Some(k) if k == &kind => Ok(()),
@ -54,7 +62,7 @@ impl RequestBuilder {
} }
/// Convert this into a "requests" object. /// Convert this into a "requests" object.
pub fn build(self) -> Requests { pub fn build(self) -> Requests<T> {
Requests { Requests {
outputs: HashMap::new(), outputs: HashMap::new(),
requests: self.requests, requests: self.requests,
@ -65,44 +73,41 @@ impl RequestBuilder {
/// Requests pending responses. /// Requests pending responses.
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub struct Requests { pub struct Requests<T> {
outputs: HashMap<(usize, usize), Output>, outputs: HashMap<(usize, usize), Output>,
requests: Vec<Request>, requests: Vec<T>,
answered: usize, answered: usize,
} }
impl Requests { impl<T> Requests<T> {
/// For each request, produce responses for each.
/// The responses vector produced goes up to the point where the responder
/// first returns `None`, an invalid response, or until all requests have been responded to.
pub fn respond_to_all<F>(mut self, responder: F) -> Vec<Response>
where F: Fn(CompleteRequest) -> Option<Response>
{
let mut responses = Vec::new();
while let Some(response) = self.next_complete().and_then(&responder) {
match self.supply_response(&response) {
Ok(()) => responses.push(response),
Err(e) => {
debug!(target: "pip", "produced bad response to request: {:?}", e);
return responses;
}
}
}
responses
}
/// Get access to the underlying slice of requests. /// Get access to the underlying slice of requests.
// TODO: unimplemented -> Vec<Request>, // do we _have to_ allocate? // TODO: unimplemented -> Vec<Request>, // do we _have to_ allocate?
pub fn requests(&self) -> &[Request] { &self.requests } pub fn requests(&self) -> &[T] { &self.requests }
/// Get the number of answered requests. /// Get the number of answered requests.
pub fn num_answered(&self) -> usize { self.answered } pub fn num_answered(&self) -> usize { self.answered }
/// Whether the batch is complete.
pub fn is_complete(&self) -> bool {
self.answered == self.requests.len()
}
/// Map requests from one type into another.
pub fn map_requests<F, U>(self, f: F) -> Requests<U>
where F: FnMut(T) -> U, U: IncompleteRequest
{
Requests {
outputs: self.outputs,
requests: self.requests.into_iter().map(f).collect(),
answered: self.answered,
}
}
}
impl<T: IncompleteRequest + Clone> Requests<T> {
/// Get the next request as a filled request. Returns `None` when all requests answered. /// Get the next request as a filled request. Returns `None` when all requests answered.
pub fn next_complete(&self) -> Option<CompleteRequest> { pub fn next_complete(&self) -> Option<T::Complete> {
if self.answered == self.requests.len() { if self.is_complete() {
None None
} else { } else {
Some(self.requests[self.answered].clone() Some(self.requests[self.answered].clone()
@ -111,14 +116,29 @@ impl Requests {
} }
} }
/// Sweep through all unanswered requests, filling them as necessary.
pub fn fill_unanswered(&mut self) {
let outputs = &mut self.outputs;
for req in self.requests.iter_mut().skip(self.answered) {
req.fill(|req_idx, out_idx| outputs.get(&(req_idx, out_idx)).cloned().ok_or(NoSuchOutput))
}
}
}
impl<T: super::CheckedRequest> Requests<T> {
/// Supply a response for the next request. /// Supply a response for the next request.
/// Fails on: wrong request kind, all requests answered already. /// Fails on: wrong request kind, all requests answered already.
pub fn supply_response(&mut self, response: &Response) -> Result<(), ResponseError> { pub fn supply_response(&mut self, env: &T::Environment, response: &T::Response)
-> Result<T::Extract, ResponseError<T::Error>>
{
let idx = self.answered; let idx = self.answered;
// check validity. // check validity.
if idx == self.requests.len() { return Err(ResponseError::Unexpected) } if self.is_complete() { return Err(ResponseError::Unexpected) }
if self.requests[idx].kind() != response.kind() { return Err(ResponseError::WrongKind) }
let extracted = self.requests[idx]
.check_response(env, response).map_err(ResponseError::Validity)?;
let outputs = &mut self.outputs; let outputs = &mut self.outputs;
response.fill_outputs(|out_idx, output| { response.fill_outputs(|out_idx, output| {
@ -135,7 +155,30 @@ impl Requests {
req.fill(|req_idx, out_idx| outputs.get(&(req_idx, out_idx)).cloned().ok_or(NoSuchOutput)) req.fill(|req_idx, out_idx| outputs.get(&(req_idx, out_idx)).cloned().ok_or(NoSuchOutput))
} }
Ok(()) Ok(extracted)
}
}
impl Requests<super::Request> {
/// For each request, produce a response.
/// The responses vector produced goes up to the point where the responder
/// first returns `None`, an invalid response, or until all requests have been responded to.
pub fn respond_to_all<F>(mut self, responder: F) -> Vec<super::Response>
where F: Fn(super::CompleteRequest) -> Option<super::Response>
{
let mut responses = Vec::new();
while let Some(response) = self.next_complete().and_then(&responder) {
match self.supply_response(&(), &response) {
Ok(()) => responses.push(response),
Err(e) => {
debug!(target: "pip", "produced bad response to request: {:?}", e);
return responses;
}
}
}
responses
} }
} }

View File

@ -69,11 +69,15 @@ pub use self::builder::{RequestBuilder, Requests};
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct NoSuchOutput; pub struct NoSuchOutput;
/// Wrong kind of response corresponding to request.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct WrongKind;
/// Error on processing a response. /// Error on processing a response.
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub enum ResponseError { pub enum ResponseError<T> {
/// Wrong kind of response. /// Error in validity.
WrongKind, Validity(T),
/// No responses expected. /// No responses expected.
Unexpected, Unexpected,
} }
@ -96,6 +100,12 @@ impl<T> Field<T> {
_ => Err(NoSuchOutput), _ => Err(NoSuchOutput),
} }
} }
fn adjust_req<F>(&mut self, mut mapping: F) where F: FnMut(usize) -> usize {
if let Field::BackReference(ref mut req_idx, _) = *self {
*req_idx = mapping(*req_idx)
}
}
} }
impl<T> From<T> for Field<T> { impl<T> From<T> for Field<T> {
@ -197,6 +207,9 @@ impl Encodable for HashOrNumber {
} }
} }
/// Type alias for "network requests".
pub type NetworkRequests = Requests<Request>;
/// All request types, as they're sent over the network. /// All request types, as they're sent over the network.
/// They may be incomplete, with back-references to outputs /// They may be incomplete, with back-references to outputs
/// of prior requests. /// of prior requests.
@ -296,6 +309,7 @@ impl Encodable for Request {
impl IncompleteRequest for Request { impl IncompleteRequest for Request {
type Complete = CompleteRequest; type Complete = CompleteRequest;
type Response = Response;
fn check_outputs<F>(&self, f: F) -> Result<(), NoSuchOutput> fn check_outputs<F>(&self, f: F) -> Result<(), NoSuchOutput>
where F: FnMut(usize, usize, OutputKind) -> Result<(), NoSuchOutput> where F: FnMut(usize, usize, OutputKind) -> Result<(), NoSuchOutput>
@ -350,6 +364,33 @@ impl IncompleteRequest for Request {
Request::Execution(req) => req.complete().map(CompleteRequest::Execution), Request::Execution(req) => req.complete().map(CompleteRequest::Execution),
} }
} }
fn adjust_refs<F>(&mut self, mapping: F) where F: FnMut(usize) -> usize {
match *self {
Request::Headers(ref mut req) => req.adjust_refs(mapping),
Request::HeaderProof(ref mut req) => req.adjust_refs(mapping),
Request::Receipts(ref mut req) => req.adjust_refs(mapping),
Request::Body(ref mut req) => req.adjust_refs(mapping),
Request::Account(ref mut req) => req.adjust_refs(mapping),
Request::Storage(ref mut req) => req.adjust_refs(mapping),
Request::Code(ref mut req) => req.adjust_refs(mapping),
Request::Execution(ref mut req) => req.adjust_refs(mapping),
}
}
}
impl CheckedRequest for Request {
type Extract = ();
type Error = WrongKind;
type Environment = ();
fn check_response(&self, _: &(), response: &Response) -> Result<(), WrongKind> {
if self.kind() == response.kind() {
Ok(())
} else {
Err(WrongKind)
}
}
} }
/// Kinds of requests. /// Kinds of requests.
@ -421,9 +462,9 @@ pub enum Response {
Execution(ExecutionResponse), Execution(ExecutionResponse),
} }
impl Response { impl ResponseLike for Response {
/// Fill reusable outputs by writing them into the function. /// Fill reusable outputs by writing them into the function.
pub fn fill_outputs<F>(&self, f: F) where F: FnMut(usize, Output) { fn fill_outputs<F>(&self, f: F) where F: FnMut(usize, Output) {
match *self { match *self {
Response::Headers(ref res) => res.fill_outputs(f), Response::Headers(ref res) => res.fill_outputs(f),
Response::HeaderProof(ref res) => res.fill_outputs(f), Response::HeaderProof(ref res) => res.fill_outputs(f),
@ -435,7 +476,9 @@ impl Response {
Response::Execution(ref res) => res.fill_outputs(f), Response::Execution(ref res) => res.fill_outputs(f),
} }
} }
}
impl Response {
/// Inspect the kind of this response. /// Inspect the kind of this response.
pub fn kind(&self) -> Kind { pub fn kind(&self) -> Kind {
match *self { match *self {
@ -490,6 +533,8 @@ impl Encodable for Response {
pub trait IncompleteRequest: Sized { pub trait IncompleteRequest: Sized {
/// The complete variant of this request. /// The complete variant of this request.
type Complete; type Complete;
/// The response to this request.
type Response: ResponseLike;
/// Check prior outputs against the needed inputs. /// Check prior outputs against the needed inputs.
/// ///
@ -511,6 +556,30 @@ pub trait IncompleteRequest: Sized {
/// Attempt to convert this request into its complete variant. /// Attempt to convert this request into its complete variant.
/// Will succeed if all fields have been filled, will fail otherwise. /// Will succeed if all fields have been filled, will fail otherwise.
fn complete(self) -> Result<Self::Complete, NoSuchOutput>; fn complete(self) -> Result<Self::Complete, NoSuchOutput>;
/// Adjust back-reference request indices.
fn adjust_refs<F>(&mut self, mapping: F) where F: FnMut(usize) -> usize;
}
/// A request which can be checked against its response for more validity.
pub trait CheckedRequest: IncompleteRequest {
/// Data extracted during the check.
type Extract;
/// Error encountered during the check.
type Error;
/// Environment passed to response check.
type Environment;
/// Check whether the response matches (beyond the type).
fn check_response(&self, &Self::Environment, &Self::Response) -> Result<Self::Extract, Self::Error>;
}
/// A response-like object.
///
/// These contain re-usable outputs.
pub trait ResponseLike {
/// Write all re-usable outputs into the provided function.
fn fill_outputs<F>(&self, output_store: F) where F: FnMut(usize, Output);
} }
/// Header request. /// Header request.
@ -555,6 +624,7 @@ pub mod header {
impl super::IncompleteRequest for Incomplete { impl super::IncompleteRequest for Incomplete {
type Complete = Complete; type Complete = Complete;
type Response = Response;
fn check_outputs<F>(&self, mut f: F) -> Result<(), NoSuchOutput> fn check_outputs<F>(&self, mut f: F) -> Result<(), NoSuchOutput>
where F: FnMut(usize, usize, OutputKind) -> Result<(), NoSuchOutput> where F: FnMut(usize, usize, OutputKind) -> Result<(), NoSuchOutput>
@ -586,6 +656,10 @@ pub mod header {
reverse: self.reverse, reverse: self.reverse,
}) })
} }
fn adjust_refs<F>(&mut self, mapping: F) where F: FnMut(usize) -> usize {
self.start.adjust_req(mapping)
}
} }
/// A complete header request. /// A complete header request.
@ -608,9 +682,9 @@ pub mod header {
pub headers: Vec<encoded::Header>, pub headers: Vec<encoded::Header>,
} }
impl Response { impl super::ResponseLike for Response {
/// Fill reusable outputs by writing them into the function. /// Fill reusable outputs by writing them into the function.
pub fn fill_outputs<F>(&self, _: F) where F: FnMut(usize, Output) { } fn fill_outputs<F>(&self, _: F) where F: FnMut(usize, Output) { }
} }
impl Decodable for Response { impl Decodable for Response {
@ -671,6 +745,7 @@ pub mod header_proof {
impl super::IncompleteRequest for Incomplete { impl super::IncompleteRequest for Incomplete {
type Complete = Complete; type Complete = Complete;
type Response = Response;
fn check_outputs<F>(&self, mut f: F) -> Result<(), NoSuchOutput> fn check_outputs<F>(&self, mut f: F) -> Result<(), NoSuchOutput>
where F: FnMut(usize, usize, OutputKind) -> Result<(), NoSuchOutput> where F: FnMut(usize, usize, OutputKind) -> Result<(), NoSuchOutput>
@ -699,6 +774,10 @@ pub mod header_proof {
num: self.num.into_scalar()?, num: self.num.into_scalar()?,
}) })
} }
fn adjust_refs<F>(&mut self, mapping: F) where F: FnMut(usize) -> usize {
self.num.adjust_req(mapping)
}
} }
/// A complete header proof request. /// A complete header proof request.
@ -719,9 +798,9 @@ pub mod header_proof {
pub td: U256, pub td: U256,
} }
impl Response { impl super::ResponseLike for Response {
/// Fill reusable outputs by providing them to the function. /// Fill reusable outputs by providing them to the function.
pub fn fill_outputs<F>(&self, mut f: F) where F: FnMut(usize, Output) { fn fill_outputs<F>(&self, mut f: F) where F: FnMut(usize, Output) {
f(0, Output::Hash(self.hash)); f(0, Output::Hash(self.hash));
} }
} }
@ -776,6 +855,7 @@ pub mod block_receipts {
impl super::IncompleteRequest for Incomplete { impl super::IncompleteRequest for Incomplete {
type Complete = Complete; type Complete = Complete;
type Response = Response;
fn check_outputs<F>(&self, mut f: F) -> Result<(), NoSuchOutput> fn check_outputs<F>(&self, mut f: F) -> Result<(), NoSuchOutput>
where F: FnMut(usize, usize, OutputKind) -> Result<(), NoSuchOutput> where F: FnMut(usize, usize, OutputKind) -> Result<(), NoSuchOutput>
@ -802,6 +882,10 @@ pub mod block_receipts {
hash: self.hash.into_scalar()?, hash: self.hash.into_scalar()?,
}) })
} }
fn adjust_refs<F>(&mut self, mapping: F) where F: FnMut(usize) -> usize {
self.hash.adjust_req(mapping)
}
} }
/// A complete block receipts request. /// A complete block receipts request.
@ -818,9 +902,9 @@ pub mod block_receipts {
pub receipts: Vec<Receipt> pub receipts: Vec<Receipt>
} }
impl Response { impl super::ResponseLike for Response {
/// Fill reusable outputs by providing them to the function. /// Fill reusable outputs by providing them to the function.
pub fn fill_outputs<F>(&self, _: F) where F: FnMut(usize, Output) {} fn fill_outputs<F>(&self, _: F) where F: FnMut(usize, Output) {}
} }
impl Decodable for Response { impl Decodable for Response {
@ -868,6 +952,7 @@ pub mod block_body {
impl super::IncompleteRequest for Incomplete { impl super::IncompleteRequest for Incomplete {
type Complete = Complete; type Complete = Complete;
type Response = Response;
fn check_outputs<F>(&self, mut f: F) -> Result<(), NoSuchOutput> fn check_outputs<F>(&self, mut f: F) -> Result<(), NoSuchOutput>
where F: FnMut(usize, usize, OutputKind) -> Result<(), NoSuchOutput> where F: FnMut(usize, usize, OutputKind) -> Result<(), NoSuchOutput>
@ -894,6 +979,10 @@ pub mod block_body {
hash: self.hash.into_scalar()?, hash: self.hash.into_scalar()?,
}) })
} }
fn adjust_refs<F>(&mut self, mapping: F) where F: FnMut(usize) -> usize {
self.hash.adjust_req(mapping)
}
} }
/// A complete block body request. /// A complete block body request.
@ -910,9 +999,9 @@ pub mod block_body {
pub body: encoded::Body, pub body: encoded::Body,
} }
impl Response { impl super::ResponseLike for Response {
/// Fill reusable outputs by providing them to the function. /// Fill reusable outputs by providing them to the function.
pub fn fill_outputs<F>(&self, _: F) where F: FnMut(usize, Output) {} fn fill_outputs<F>(&self, _: F) where F: FnMut(usize, Output) {}
} }
impl Decodable for Response { impl Decodable for Response {
@ -971,6 +1060,7 @@ pub mod account {
impl super::IncompleteRequest for Incomplete { impl super::IncompleteRequest for Incomplete {
type Complete = Complete; type Complete = Complete;
type Response = Response;
fn check_outputs<F>(&self, mut f: F) -> Result<(), NoSuchOutput> fn check_outputs<F>(&self, mut f: F) -> Result<(), NoSuchOutput>
where F: FnMut(usize, usize, OutputKind) -> Result<(), NoSuchOutput> where F: FnMut(usize, usize, OutputKind) -> Result<(), NoSuchOutput>
@ -1013,6 +1103,11 @@ pub mod account {
address_hash: self.address_hash.into_scalar()?, address_hash: self.address_hash.into_scalar()?,
}) })
} }
fn adjust_refs<F>(&mut self, mut mapping: F) where F: FnMut(usize) -> usize {
self.block_hash.adjust_req(&mut mapping);
self.address_hash.adjust_req(&mut mapping);
}
} }
/// A complete request for an account. /// A complete request for an account.
@ -1039,9 +1134,9 @@ pub mod account {
pub storage_root: H256, pub storage_root: H256,
} }
impl Response { impl super::ResponseLike for Response {
/// Fill reusable outputs by providing them to the function. /// Fill reusable outputs by providing them to the function.
pub fn fill_outputs<F>(&self, mut f: F) where F: FnMut(usize, Output) { fn fill_outputs<F>(&self, mut f: F) where F: FnMut(usize, Output) {
f(0, Output::Hash(self.code_hash)); f(0, Output::Hash(self.code_hash));
f(1, Output::Hash(self.storage_root)); f(1, Output::Hash(self.storage_root));
} }
@ -1109,6 +1204,7 @@ pub mod storage {
impl super::IncompleteRequest for Incomplete { impl super::IncompleteRequest for Incomplete {
type Complete = Complete; type Complete = Complete;
type Response = Response;
fn check_outputs<F>(&self, mut f: F) -> Result<(), NoSuchOutput> fn check_outputs<F>(&self, mut f: F) -> Result<(), NoSuchOutput>
where F: FnMut(usize, usize, OutputKind) -> Result<(), NoSuchOutput> where F: FnMut(usize, usize, OutputKind) -> Result<(), NoSuchOutput>
@ -1162,6 +1258,12 @@ pub mod storage {
key_hash: self.key_hash.into_scalar()?, key_hash: self.key_hash.into_scalar()?,
}) })
} }
fn adjust_refs<F>(&mut self, mut mapping: F) where F: FnMut(usize) -> usize {
self.block_hash.adjust_req(&mut mapping);
self.address_hash.adjust_req(&mut mapping);
self.key_hash.adjust_req(&mut mapping);
}
} }
/// A complete request for a storage proof. /// A complete request for a storage proof.
@ -1184,9 +1286,9 @@ pub mod storage {
pub value: H256, pub value: H256,
} }
impl Response { impl super::ResponseLike for Response {
/// Fill reusable outputs by providing them to the function. /// Fill reusable outputs by providing them to the function.
pub fn fill_outputs<F>(&self, mut f: F) where F: FnMut(usize, Output) { fn fill_outputs<F>(&self, mut f: F) where F: FnMut(usize, Output) {
f(0, Output::Hash(self.value)); f(0, Output::Hash(self.value));
} }
} }
@ -1243,6 +1345,7 @@ pub mod contract_code {
impl super::IncompleteRequest for Incomplete { impl super::IncompleteRequest for Incomplete {
type Complete = Complete; type Complete = Complete;
type Response = Response;
fn check_outputs<F>(&self, mut f: F) -> Result<(), NoSuchOutput> fn check_outputs<F>(&self, mut f: F) -> Result<(), NoSuchOutput>
where F: FnMut(usize, usize, OutputKind) -> Result<(), NoSuchOutput> where F: FnMut(usize, usize, OutputKind) -> Result<(), NoSuchOutput>
@ -1281,6 +1384,11 @@ pub mod contract_code {
code_hash: self.code_hash.into_scalar()?, code_hash: self.code_hash.into_scalar()?,
}) })
} }
fn adjust_refs<F>(&mut self, mut mapping: F) where F: FnMut(usize) -> usize {
self.block_hash.adjust_req(&mut mapping);
self.code_hash.adjust_req(&mut mapping);
}
} }
/// A complete request. /// A complete request.
@ -1299,9 +1407,9 @@ pub mod contract_code {
pub code: Bytes, pub code: Bytes,
} }
impl Response { impl super::ResponseLike for Response {
/// Fill reusable outputs by providing them to the function. /// Fill reusable outputs by providing them to the function.
pub fn fill_outputs<F>(&self, _: F) where F: FnMut(usize, Output) {} fn fill_outputs<F>(&self, _: F) where F: FnMut(usize, Output) {}
} }
impl Decodable for Response { impl Decodable for Response {
@ -1380,6 +1488,7 @@ pub mod execution {
impl super::IncompleteRequest for Incomplete { impl super::IncompleteRequest for Incomplete {
type Complete = Complete; type Complete = Complete;
type Response = Response;
fn check_outputs<F>(&self, mut f: F) -> Result<(), NoSuchOutput> fn check_outputs<F>(&self, mut f: F) -> Result<(), NoSuchOutput>
where F: FnMut(usize, usize, OutputKind) -> Result<(), NoSuchOutput> where F: FnMut(usize, usize, OutputKind) -> Result<(), NoSuchOutput>
@ -1412,6 +1521,10 @@ pub mod execution {
data: self.data, data: self.data,
}) })
} }
fn adjust_refs<F>(&mut self, mapping: F) where F: FnMut(usize) -> usize {
self.block_hash.adjust_req(mapping);
}
} }
/// A complete request. /// A complete request.
@ -1440,9 +1553,9 @@ pub mod execution {
pub items: Vec<DBValue>, pub items: Vec<DBValue>,
} }
impl Response { impl super::ResponseLike for Response {
/// Fill reusable outputs by providing them to the function. /// Fill reusable outputs by providing them to the function.
pub fn fill_outputs<F>(&self, _: F) where F: FnMut(usize, Output) {} fn fill_outputs<F>(&self, _: F) where F: FnMut(usize, Output) {}
} }
impl Decodable for Response { impl Decodable for Response {

View File

@ -17,12 +17,14 @@
//! Block header. //! Block header.
use util::*; use util::*;
use basic_types::{LogBloom, Seal, ZERO_LOGBLOOM}; use basic_types::{LogBloom, ZERO_LOGBLOOM};
use time::get_time; use time::get_time;
use rlp::*; use rlp::*;
use std::cell::RefCell; use std::cell::RefCell;
pub use basic_types::Seal;
/// Type for Block number /// Type for Block number
pub type BlockNumber = u64; pub type BlockNumber = u64;

View File

@ -67,6 +67,7 @@ impl IoHandler<ClientIoMessage> for QueueCull {
let (sync, on_demand, txq) = (self.sync.clone(), self.on_demand.clone(), self.txq.clone()); let (sync, on_demand, txq) = (self.sync.clone(), self.on_demand.clone(), self.txq.clone());
let best_header = self.client.best_block_header(); let best_header = self.client.best_block_header();
let start_nonce = self.client.engine().account_start_nonce();
info!(target: "cull", "Attempting to cull queued transactions from {} senders.", senders.len()); info!(target: "cull", "Attempting to cull queued transactions from {} senders.", senders.len());
self.remote.spawn_with_timeout(move || { self.remote.spawn_with_timeout(move || {
@ -74,7 +75,10 @@ impl IoHandler<ClientIoMessage> for QueueCull {
// fetch the nonce of each sender in the queue. // fetch the nonce of each sender in the queue.
let nonce_futures = senders.iter() let nonce_futures = senders.iter()
.map(|&address| request::Account { header: best_header.clone(), address: address }) .map(|&address| request::Account { header: best_header.clone(), address: address })
.map(|request| on_demand.account(ctx, request).map(|acc| acc.nonce)) .map(move |request| {
on_demand.account(ctx, request)
.map(move |maybe_acc| maybe_acc.map_or(start_nonce, |acc| acc.nonce))
})
.zip(senders.iter()) .zip(senders.iter())
.map(|(fut, &addr)| fut.map(move |nonce| (addr, nonce))); .map(|(fut, &addr)| fut.map(move |nonce| (addr, nonce)));

View File

@ -228,8 +228,7 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) ->
} }
// start on_demand service. // start on_demand service.
let account_start_nonce = service.client().engine().account_start_nonce(); let on_demand = Arc::new(::light::on_demand::OnDemand::new(cache.clone()));
let on_demand = Arc::new(::light::on_demand::OnDemand::new(cache.clone(), account_start_nonce));
// set network path. // set network path.
net_conf.net_config_path = Some(db_dirs.network_path().to_string_lossy().into_owned()); net_conf.net_config_path = Some(db_dirs.network_path().to_string_lossy().into_owned());

View File

@ -281,6 +281,7 @@ impl LightDispatcher {
} }
let best_header = self.client.best_block_header(); let best_header = self.client.best_block_header();
let account_start_nonce = self.client.engine().account_start_nonce();
let nonce_future = self.sync.with_context(|ctx| self.on_demand.account(ctx, request::Account { let nonce_future = self.sync.with_context(|ctx| self.on_demand.account(ctx, request::Account {
header: best_header, header: best_header,
address: addr, address: addr,
@ -288,7 +289,7 @@ impl LightDispatcher {
match nonce_future { match nonce_future {
Some(x) => Some(x) =>
x.map(|acc| acc.nonce) x.map(move |acc| acc.map_or(account_start_nonce, |acc| acc.nonce))
.map_err(|_| errors::no_light_peers()) .map_err(|_| errors::no_light_peers())
.boxed(), .boxed(),
None => future::err(errors::network_disabled()).boxed() None => future::err(errors::network_disabled()).boxed()

View File

@ -57,16 +57,16 @@ pub type ExecutionResult = Result<Executed, ExecutionError>;
impl LightFetch { impl LightFetch {
/// Get a block header from the on demand service or client, or error. /// Get a block header from the on demand service or client, or error.
pub fn header(&self, id: BlockId) -> BoxFuture<Option<encoded::Header>, Error> { pub fn header(&self, id: BlockId) -> BoxFuture<encoded::Header, Error> {
if let Some(h) = self.client.block_header(id) { if let Some(h) = self.client.block_header(id) {
return future::ok(Some(h)).boxed() return future::ok(h).boxed()
} }
let maybe_future = match id { let maybe_future = match id {
BlockId::Number(n) => { BlockId::Number(n) => {
let cht_root = cht::block_to_cht_number(n).and_then(|cn| self.client.cht_root(cn as usize)); let cht_root = cht::block_to_cht_number(n).and_then(|cn| self.client.cht_root(cn as usize));
match cht_root { match cht_root {
None => return future::ok(None).boxed(), None => return future::err(errors::unknown_block()).boxed(),
Some(root) => { Some(root) => {
let req = request::HeaderProof::new(n, root) let req = request::HeaderProof::new(n, root)
.expect("only fails for 0; client always stores genesis; client already queried; qed"); .expect("only fails for 0; client always stores genesis; client already queried; qed");
@ -82,7 +82,7 @@ impl LightFetch {
Some(fut) => fut.map_err(errors::on_demand_cancel).boxed(), Some(fut) => fut.map_err(errors::on_demand_cancel).boxed(),
None => future::err(errors::network_disabled()).boxed(), None => future::err(errors::network_disabled()).boxed(),
} }
}).map(Some).boxed() }).boxed()
}) })
} }
} }
@ -91,7 +91,7 @@ impl LightFetch {
self.sync.with_context(|ctx| self.sync.with_context(|ctx|
self.on_demand.header_by_hash(ctx, request::HeaderByHash(h)) self.on_demand.header_by_hash(ctx, request::HeaderByHash(h))
.then(|res| future::done(match res { .then(|res| future::done(match res {
Ok(h) => Ok(Some(h)), Ok(h) => Ok(h),
Err(e) => Err(errors::on_demand_cancel(e)), Err(e) => Err(errors::on_demand_cancel(e)),
})) }))
.boxed() .boxed()
@ -106,22 +106,21 @@ impl LightFetch {
} }
} }
// Get account info at a given block. `None` signifies no such account existing. /// helper for getting account info at a given block.
/// `None` indicates the account doesn't exist at the given block.
pub fn account(&self, address: Address, id: BlockId) -> BoxFuture<Option<BasicAccount>, Error> { pub fn account(&self, address: Address, id: BlockId) -> BoxFuture<Option<BasicAccount>, Error> {
let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone()); let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone());
self.header(id).and_then(move |header| { self.header(id).and_then(move |header| {
let header = match header { let maybe_fut = sync.with_context(|ctx| on_demand.account(ctx, request::Account {
None => return future::ok(None).boxed(),
Some(hdr) => hdr,
};
sync.with_context(|ctx| on_demand.account(ctx, request::Account {
header: header, header: header,
address: address, address: address,
}).map(Some)) }));
.map(|x| x.map_err(errors::on_demand_cancel).boxed())
.unwrap_or_else(|| future::err(errors::network_disabled()).boxed()) match maybe_fut {
Some(fut) => fut.map_err(errors::on_demand_cancel).boxed(),
None => future::err(errors::network_disabled()).boxed(),
}
}).boxed() }).boxed()
} }
@ -176,10 +175,11 @@ impl LightFetch {
}).join(header_fut).and_then(move |(tx, hdr)| { }).join(header_fut).and_then(move |(tx, hdr)| {
// then request proved execution. // then request proved execution.
// TODO: get last-hashes from network. // TODO: get last-hashes from network.
let (env_info, hdr) = match (client.env_info(id), hdr) { let env_info = match client.env_info(id) {
(Some(env_info), Some(hdr)) => (env_info, hdr), Some(env_info) => env_info,
_ => return future::err(errors::unknown_block()).boxed(), _ => return future::err(errors::unknown_block()).boxed(),
}; };
let request = request::TransactionProof { let request = request::TransactionProof {
tx: tx, tx: tx,
header: hdr, header: hdr,
@ -198,18 +198,13 @@ impl LightFetch {
}).boxed() }).boxed()
} }
/// Get a block. /// get a block itself. fails on unknown block ID.
pub fn block(&self, id: BlockId) -> BoxFuture<Option<encoded::Block>, Error> { pub fn block(&self, id: BlockId) -> BoxFuture<encoded::Block, Error> {
let (on_demand, sync) = (self.on_demand.clone(), self.sync.clone()); let (on_demand, sync) = (self.on_demand.clone(), self.sync.clone());
self.header(id).and_then(move |hdr| { self.header(id).map(request::Body::new).and_then(move |req| {
let req = match hdr {
Some(hdr) => request::Body::new(hdr),
None => return future::ok(None).boxed(),
};
match sync.with_context(move |ctx| on_demand.block(ctx, req)) { match sync.with_context(move |ctx| on_demand.block(ctx, req)) {
Some(fut) => fut.map_err(errors::on_demand_cancel).map(Some).boxed(), Some(fut) => fut.map_err(errors::on_demand_cancel).boxed(),
None => future::err(errors::network_disabled()).boxed(), None => future::err(errors::network_disabled()).boxed(),
} }
}).boxed() }).boxed()

View File

@ -115,12 +115,11 @@ impl EthClient {
on_demand: self.on_demand.clone(), on_demand: self.on_demand.clone(),
sync: self.sync.clone(), sync: self.sync.clone(),
cache: self.cache.clone(), cache: self.cache.clone(),
} }
} }
// get a "rich" block structure // get a "rich" block structure. Fails on unknown block.
fn rich_block(&self, id: BlockId, include_txs: bool) -> BoxFuture<Option<RichBlock>, Error> { fn rich_block(&self, id: BlockId, include_txs: bool) -> BoxFuture<RichBlock, Error> {
let (on_demand, sync) = (self.on_demand.clone(), self.sync.clone()); let (on_demand, sync) = (self.on_demand.clone(), self.sync.clone());
let (client, engine) = (self.client.clone(), self.client.engine().clone()); let (client, engine) = (self.client.clone(), self.client.engine().clone());
let eip86_transition = self.client.eip86_transition(); let eip86_transition = self.client.eip86_transition();
@ -160,19 +159,16 @@ impl EthClient {
}; };
// get the block itself. // get the block itself.
self.fetcher().block(id).and_then(move |block| match block { self.fetcher().block(id).and_then(move |block| {
None => return future::ok(None).boxed(),
Some(block) => {
// then fetch the total difficulty (this is much easier after getting the block). // then fetch the total difficulty (this is much easier after getting the block).
match client.score(id) { match client.score(id) {
Some(score) => future::ok(fill_rich(block, Some(score))).map(Some).boxed(), Some(score) => future::ok(fill_rich(block, Some(score))).boxed(),
None => { None => {
// make a CHT request to fetch the chain score. // make a CHT request to fetch the chain score.
let req = cht::block_to_cht_number(block.number()) let req = cht::block_to_cht_number(block.number())
.and_then(|num| client.cht_root(num as usize)) .and_then(|num| client.cht_root(num as usize))
.and_then(|root| request::HeaderProof::new(block.number(), root)); .and_then(|root| request::HeaderProof::new(block.number(), root));
let req = match req { let req = match req {
Some(req) => req, Some(req) => req,
None => { None => {
@ -182,14 +178,14 @@ impl EthClient {
.expect("genesis always stored; qed") .expect("genesis always stored; qed")
.difficulty(); .difficulty();
return future::ok(fill_rich(block, Some(score))).map(Some).boxed() return future::ok(fill_rich(block, Some(score))).boxed()
} }
}; };
// three possible outcomes: // three possible outcomes:
// - network is down. // - network is down.
// - we get a score, but our hash is non-canonical. // - we get a score, but our hash is non-canonical.
// - we get ascore, and our hash is canonical. // - we get a score, and our hash is canonical.
let maybe_fut = sync.with_context(move |ctx| on_demand.hash_and_score_by_number(ctx, req)); let maybe_fut = sync.with_context(move |ctx| on_demand.hash_and_score_by_number(ctx, req));
match maybe_fut { match maybe_fut {
Some(fut) => fut.map(move |(hash, score)| { Some(fut) => fut.map(move |(hash, score)| {
@ -199,13 +195,12 @@ impl EthClient {
None None
}; };
Some(fill_rich(block, score)) fill_rich(block, score)
}).map_err(errors::on_demand_cancel).boxed(), }).map_err(errors::on_demand_cancel).boxed(),
None => return future::err(errors::network_disabled()).boxed(), None => return future::err(errors::network_disabled()).boxed(),
} }
} }
} }
}
}).boxed() }).boxed()
} }
} }
@ -281,11 +276,11 @@ impl Eth for EthClient {
} }
fn block_by_hash(&self, hash: RpcH256, include_txs: bool) -> BoxFuture<Option<RichBlock>, Error> { fn block_by_hash(&self, hash: RpcH256, include_txs: bool) -> BoxFuture<Option<RichBlock>, Error> {
self.rich_block(BlockId::Hash(hash.into()), include_txs) self.rich_block(BlockId::Hash(hash.into()), include_txs).map(Some).boxed()
} }
fn block_by_number(&self, num: BlockNumber, include_txs: bool) -> BoxFuture<Option<RichBlock>, Error> { fn block_by_number(&self, num: BlockNumber, include_txs: bool) -> BoxFuture<Option<RichBlock>, Error> {
self.rich_block(num.into(), include_txs) self.rich_block(num.into(), include_txs).map(Some).boxed()
} }
fn transaction_count(&self, address: RpcH160, num: Trailing<BlockNumber>) -> BoxFuture<RpcU256, Error> { fn transaction_count(&self, address: RpcH160, num: Trailing<BlockNumber>) -> BoxFuture<RpcU256, Error> {
@ -297,11 +292,6 @@ impl Eth for EthClient {
let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone()); let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone());
self.fetcher().header(BlockId::Hash(hash.into())).and_then(move |hdr| { self.fetcher().header(BlockId::Hash(hash.into())).and_then(move |hdr| {
let hdr = match hdr {
None => return future::ok(None).boxed(),
Some(hdr) => hdr,
};
if hdr.transactions_root() == SHA3_NULL_RLP { if hdr.transactions_root() == SHA3_NULL_RLP {
future::ok(Some(U256::from(0).into())).boxed() future::ok(Some(U256::from(0).into())).boxed()
} else { } else {
@ -317,11 +307,6 @@ impl Eth for EthClient {
let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone()); let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone());
self.fetcher().header(num.into()).and_then(move |hdr| { self.fetcher().header(num.into()).and_then(move |hdr| {
let hdr = match hdr {
None => return future::ok(None).boxed(),
Some(hdr) => hdr,
};
if hdr.transactions_root() == SHA3_NULL_RLP { if hdr.transactions_root() == SHA3_NULL_RLP {
future::ok(Some(U256::from(0).into())).boxed() future::ok(Some(U256::from(0).into())).boxed()
} else { } else {
@ -337,11 +322,6 @@ impl Eth for EthClient {
let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone()); let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone());
self.fetcher().header(BlockId::Hash(hash.into())).and_then(move |hdr| { self.fetcher().header(BlockId::Hash(hash.into())).and_then(move |hdr| {
let hdr = match hdr {
None => return future::ok(None).boxed(),
Some(hdr) => hdr,
};
if hdr.uncles_hash() == SHA3_EMPTY_LIST_RLP { if hdr.uncles_hash() == SHA3_EMPTY_LIST_RLP {
future::ok(Some(U256::from(0).into())).boxed() future::ok(Some(U256::from(0).into())).boxed()
} else { } else {
@ -357,11 +337,6 @@ impl Eth for EthClient {
let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone()); let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone());
self.fetcher().header(num.into()).and_then(move |hdr| { self.fetcher().header(num.into()).and_then(move |hdr| {
let hdr = match hdr {
None => return future::ok(None).boxed(),
Some(hdr) => hdr,
};
if hdr.uncles_hash() == SHA3_EMPTY_LIST_RLP { if hdr.uncles_hash() == SHA3_EMPTY_LIST_RLP {
future::ok(Some(U256::from(0).into())).boxed() future::ok(Some(U256::from(0).into())).boxed()
} else { } else {

View File

@ -360,7 +360,7 @@ impl Parity for ParityClient {
}) })
} }
fn block_header(&self, number: Trailing<BlockNumber>) -> BoxFuture<Option<RichHeader>, Error> { fn block_header(&self, number: Trailing<BlockNumber>) -> BoxFuture<RichHeader, Error> {
use ethcore::encoded; use ethcore::encoded;
let engine = self.light_dispatch.client.engine().clone(); let engine = self.light_dispatch.client.engine().clone();
@ -391,7 +391,7 @@ impl Parity for ParityClient {
} }
}; };
self.fetcher().header(number.0.into()).map(move |encoded| encoded.map(from_encoded)).boxed() self.fetcher().header(number.0.into()).map(from_encoded).boxed()
} }
fn ipfs_cid(&self, content: Bytes) -> Result<String, Error> { fn ipfs_cid(&self, content: Bytes) -> Result<String, Error> {

View File

@ -400,17 +400,17 @@ impl<C, M, S: ?Sized, U> Parity for ParityClient<C, M, S, U> where
}) })
} }
fn block_header(&self, number: Trailing<BlockNumber>) -> BoxFuture<Option<RichHeader>, Error> { fn block_header(&self, number: Trailing<BlockNumber>) -> BoxFuture<RichHeader, Error> {
const EXTRA_INFO_PROOF: &'static str = "Object exists in in blockchain (fetched earlier), extra_info is always available if object exists; qed"; const EXTRA_INFO_PROOF: &'static str = "Object exists in in blockchain (fetched earlier), extra_info is always available if object exists; qed";
let client = take_weakf!(self.client); let client = take_weakf!(self.client);
let id: BlockId = number.0.into(); let id: BlockId = number.0.into();
let encoded = match client.block_header(id.clone()) { let encoded = match client.block_header(id.clone()) {
Some(encoded) => encoded, Some(encoded) => encoded,
None => return future::ok(None).boxed(), None => return future::err(errors::unknown_block()).boxed(),
}; };
future::ok(Some(RichHeader { future::ok(RichHeader {
inner: Header { inner: Header {
hash: Some(encoded.hash().into()), hash: Some(encoded.hash().into()),
size: Some(encoded.rlp().as_raw().len().into()), size: Some(encoded.rlp().as_raw().len().into()),
@ -431,7 +431,7 @@ impl<C, M, S: ?Sized, U> Parity for ParityClient<C, M, S, U> where
extra_data: Bytes::new(encoded.extra_data()), extra_data: Bytes::new(encoded.extra_data()),
}, },
extra_info: client.block_extra_info(id).expect(EXTRA_INFO_PROOF), extra_info: client.block_extra_info(id).expect(EXTRA_INFO_PROOF),
})).boxed() }).boxed()
} }
fn ipfs_cid(&self, content: Bytes) -> Result<String, Error> { fn ipfs_cid(&self, content: Bytes) -> Result<String, Error> {

View File

@ -202,7 +202,7 @@ build_rpc_trait! {
/// Get block header. /// Get block header.
/// Same as `eth_getBlockByNumber` but without uncles and transactions. /// Same as `eth_getBlockByNumber` but without uncles and transactions.
#[rpc(async, name = "parity_getBlockHeaderByNumber")] #[rpc(async, name = "parity_getBlockHeaderByNumber")]
fn block_header(&self, Trailing<BlockNumber>) -> BoxFuture<Option<RichHeader>, Error>; fn block_header(&self, Trailing<BlockNumber>) -> BoxFuture<RichHeader, Error>;
/// Get IPFS CIDv0 given protobuf encoded bytes. /// Get IPFS CIDv0 given protobuf encoded bytes.
#[rpc(name = "parity_cidV0")] #[rpc(name = "parity_cidV0")]