finish request module, basic dispatch

This commit is contained in:
Robert Habermeier 2017-01-03 19:13:40 +01:00
parent 4dbbc3bc88
commit 01977e60aa
2 changed files with 198 additions and 233 deletions

View File

@ -28,7 +28,7 @@ use futures::sync::oneshot;
use network::PeerId;
use net::{Handler, Status, Capabilities, Announcement, EventContext, BasicContext, ReqId};
use util::{Address, H256, U256, RwLock};
use util::{Bytes, H256, U256, RwLock};
use types::les_request::{self as les_request, Request as LesRequest};
pub mod request;
@ -48,6 +48,7 @@ pub struct Account {
}
/// Errors which can occur while trying to fulfill a request.
#[derive(Debug, Clone, Copy)]
pub enum Error {
/// Request was canceled.
Canceled,
@ -88,10 +89,10 @@ struct Peer {
}
// Attempted request info and sender to put received value.
enum Attempted {
enum Pending {
HeaderByNumber(request::HeaderByNumber, Sender<encoded::Header>), // num + CHT root
HeaderByHash(request::HeaderByHash, Sender<encoded::Header>),
Block(request::Block, Sender<encoded::Block>),
Block(request::Body, Sender<encoded::Block>),
BlockReceipts(request::BlockReceipts, Sender<Vec<Receipt>>),
Account(request::Account, Sender<Account>),
}
@ -101,257 +102,195 @@ enum Attempted {
/// requests to them accordingly.
pub struct OnDemand {
peers: RwLock<HashMap<PeerId, Peer>>,
pending_requests: RwLock<HashMap<ReqId, Request>>,
pending_requests: RwLock<HashMap<ReqId, Pending>>,
}
impl OnDemand {
/// Request a header by block number and CHT root hash.
pub fn header_by_number(&self, ctx: &BasicContext, num: u64, cht_root: H256) -> Response<encoded::Header> {
pub fn header_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber) -> Response<encoded::Header> {
let (sender, receiver) = oneshot::channel();
self.dispatch_request(ctx, Pending::HeaderByNumber(num, cht_root, sender));
let num = req.num;
let cht_num = ::client::cht::block_to_cht_number(req.num);
let les_req = LesRequest::HeaderProofs(les_request::HeaderProofs {
requests: vec![les_request::HeaderProof {
cht_number: cht_num,
block_number: req.num,
from_level: 0,
}],
});
// we're looking for a peer with serveHeaders who's far enough along in the
// chain.
for (id, peer) in self.peers.read().iter() {
if peer.capabilities.serve_headers && peer.status.head_num >= num {
match ctx.request_from(*id, les_req.clone()) {
Ok(req_id) => {
trace!(target: "on_demand", "Assigning request to peer {}", id);
self.pending_requests.write().insert(
req_id,
Pending::HeaderByNumber(req, sender)
);
return Response(receiver);
},
Err(e) =>
trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e),
}
}
}
// TODO: retrying
trace!(target: "on_demand", "No suitable peer for request");
sender.complete(Err(Error::NoPeersAvailable));
Response(receiver)
}
/// Request a header by hash. This is less accurate than by-number because we don't know
/// where in the chain this header lies, and therefore can't find a peer who is supposed to have
/// it as easily.
pub fn header_by_hash(&self, ctx: &BasicContext, hash: H256) -> Response<encoded::Header> {
pub fn header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash) -> Response<encoded::Header> {
let (sender, receiver) = oneshot::channel();
self.dispatch_request(ctx, Pending::HeaderByHash(hash, sender));
let les_req = LesRequest::Headers(les_request::Headers {
start: req.0.into(),
max: 1,
skip: 0,
reverse: false,
});
// all we've got is a hash, so we'll just guess at peers who might have
// it randomly.
let mut potential_peers = self.peers.read().iter()
.filter(|&(_, peer)| peer.capabilities.serve_headers)
.map(|(id, _)| *id)
.collect::<Vec<_>>();
let mut rng = ::rand::thread_rng();
::rand::Rng::shuffle(&mut rng, &mut potential_peers);
for id in potential_peers {
match ctx.request_from(id, les_req.clone()) {
Ok(req_id) => {
trace!(target: "on_demand", "Assigning request to peer {}", id);
self.pending_requests.write().insert(
req_id,
Pending::HeaderByHash(req, sender),
);
return Response(receiver);
}
Err(e) =>
trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e),
}
}
// TODO: retrying
trace!(target: "on_demand", "No suitable peer for request");
sender.complete(Err(Error::NoPeersAvailable));
Response(receiver)
}
/// Request a block, given its header. Block bodies are requestable by hash only,
/// and the header is required anyway to verify and complete the block body
/// -- this just doesn't obscure the network query.
pub fn block(&self, ctx: &BasicContext, header: encoded::Header) -> Response<encoded::Block> {
pub fn block(&self, ctx: &BasicContext, req: request::Body) -> Response<encoded::Block> {
let (sender, receiver) = oneshot::channel();
let hash = header.hash();
self.dispatch_request(ctx, Pending::Block(header, hash, sender));
let num = req.header.number();
let les_req = LesRequest::Bodies(les_request::Bodies {
block_hashes: vec![req.hash],
});
// we're looking for a peer with serveChainSince(num)
for (id, peer) in self.peers.read().iter() {
if peer.capabilities.serve_chain_since.as_ref().map_or(false, |x| *x >= num) {
match ctx.request_from(*id, les_req.clone()) {
Ok(req_id) => {
trace!(target: "on_demand", "Assigning request to peer {}", id);
self.pending_requests.write().insert(
req_id,
Pending::Block(req, sender)
);
return Response(receiver)
}
Err(e) =>
trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e),
}
}
}
// TODO: retrying
trace!(target: "on_demand", "No suitable peer for request");
sender.complete(Err(Error::NoPeersAvailable));
Response(receiver)
}
/// Request the receipts for a block. The header serves two purposes:
/// provide the block hash to fetch receipts for, and for verification of the receipts root.
pub fn block_receipts(&self, ctx: &BasicContext, header: encoded::Header) -> Response<Vec<Receipt>> {
pub fn block_receipts(&self, ctx: &BasicContext, req: request::BlockReceipts) -> Response<Vec<Receipt>> {
let (sender, receiver) = oneshot::channel();
self.dispatch_request(ctx, Pending::BlockReceipts(header, sender));
let num = req.0.number();
let les_req = LesRequest::Receipts(les_request::Receipts {
block_hashes: vec![req.0.hash()],
});
// we're looking for a peer with serveChainSince(num)
for (id, peer) in self.peers.read().iter() {
if peer.capabilities.serve_chain_since.as_ref().map_or(false, |x| *x >= num) {
match ctx.request_from(*id, les_req.clone()) {
Ok(req_id) => {
trace!(target: "on_demand", "Assigning request to peer {}", id);
self.pending_requests.write().insert(
req_id,
Pending::BlockReceipts(req, sender)
);
return Response(receiver)
}
Err(e) =>
trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e),
}
}
}
// TODO: retrying
trace!(target: "on_demand", "No suitable peer for request");
sender.complete(Err(Error::NoPeersAvailable));
Response(receiver)
}
/// Request an account by address and block header -- which gives a hash to query and a state root
/// to verify against.
pub fn account(&self, ctx: &BasicContext, header: encoded::Header, address: Address) -> Response<Account> {
pub fn account(&self, ctx: &BasicContext, req: request::Account) -> Response<Account> {
let (sender, receiver) = oneshot::channel();
self.dispatch_request(ctx, Pending::Account(header, address, sender));
Response(receiver)
}
let num = req.header.number();
let les_req = LesRequest::StateProofs(les_request::StateProofs {
requests: vec![les_request::StateProof {
block: req.header.hash(),
key1: ::util::Hashable::sha3(&req.address),
key2: None,
from_level: 0,
}],
});
/// Request account storage value by block header, address, and key.
pub fn storage(&self, ctx: &BasicContext, header: encoded::Header, address: Address, key: H256) -> Response<H256> {
let (sender, receiver) = oneshot::channel();
self.dispatch_request(ctx, Pending::Storage(header, address, key, sender));
Response(receiver)
}
// dispatch a request to a suitable peer.
//
// TODO: most of this will become obsolete with a PeerSearch utility (#3987)
fn dispatch_request(&self, ctx: &BasicContext, request: Request) {
match request {
Pending::HeaderByNumber(request::HeaderByNumber(num, cht_hash), sender) => {
let cht_num = ::client::cht::block_to_cht_number(num);
let req = LesRequest::HeaderProofs(les_request::HeaderProofs {
requests: vec![les_request::HeaderProof {
cht_number: cht_num,
block_number: num,
from_level: 0,
}],
});
// we're looking for a peer with serveHeaders who's far enough along in the
// chain.
for (id, peer) in self.peers.read().iter() {
if peer.capabilities.serve_headers && peer.status.head_num >= num {
match ctx.request_from(*id, req.clone()) {
Ok(req_id) => {
trace!(target: "on_demand", "Assigning request to peer {}", id);
self.pending_requests.write().insert(
req_id,
Pending::HeaderByNumber(num, cht_hash, sender)
);
return
},
Err(e) =>
trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e),
}
// we're looking for a peer with serveStateSince(num)
for (id, peer) in self.peers.read().iter() {
if peer.capabilities.serve_state_since.as_ref().map_or(false, |x| *x >= num) {
match ctx.request_from(*id, les_req.clone()) {
Ok(req_id) => {
trace!(target: "on_demand", "Assigning request to peer {}", id);
self.pending_requests.write().insert(
req_id,
Pending::Account(req, sender)
);
return Response(receiver)
}
Err(e) =>
trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e),
}
// TODO: retrying
trace!(target: "on_demand", "No suitable peer for request");
sender.complete(Err(Error::NoPeersAvailable));
}
Pending::HeaderByHash(hash, sender) => {
let req = LesRequest::Headers(les_request::Headers {
start: hash.into(),
max: 1,
skip: 0,
reverse: false,
});
// all we've got is a hash, so we'll just guess at peers who might have
// it randomly.
let mut potential_peers = self.peers.read().iter()
.filter(|&(_, peer)| peer.capabilities.serve_headers)
.map(|(id, _)| *id)
.collect::<Vec<_>>();
let mut rng = ::rand::thread_rng();
::rand::Rng::shuffle(&mut rng, &mut potential_peers);
for id in potential_peers {
match ctx.request_from(id, req.clone()) {
Ok(req_id) => {
trace!(target: "on_demand", "Assigning request to peer {}", id);
self.pending_requests.write().insert(
req_id,
Request::HeaderByHash(hash, sender),
);
return
}
Err(e) =>
trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e),
}
}
// TODO: retrying
trace!(target: "on_demand", "No suitable peer for request");
sender.complete(Err(Error::NoPeersAvailable));
}
Pending::Block(header, hash, sender) => {
let num = header.number();
let req = LesRequest::Bodies(les_request::Bodies {
block_hashes: vec![hash],
});
// we're looking for a peer with serveChainSince(num)
for (id, peer) in self.peers.read().iter() {
if peer.capabilities.serve_chain_since.as_ref().map_or(false, |x| *x >= num) {
match ctx.request_from(*id, req.clone()) {
Ok(req_id) => {
trace!(target: "on_demand", "Assigning request to peer {}", id);
self.pending_requests.write().insert(
req_id,
Request::Block(header, hash, sender)
);
return
}
Err(e) =>
trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e),
}
}
}
// TODO: retrying
trace!(target: "on_demand", "No suitable peer for request");
sender.complete(Err(Error::NoPeersAvailable));
}
Pending::BlockReceipts(header, sender) => {
let num = header.number();
let req = LesRequest::Receipts(les_request::Receipts {
block_hashes: vec![header.hash()],
});
// we're looking for a peer with serveChainSince(num)
for (id, peer) in self.peers.read().iter() {
if peer.capabilities.serve_chain_since.as_ref().map_or(false, |x| *x >= num) {
match ctx.request_from(*id, req.clone()) {
Ok(req_id) => {
trace!(target: "on_demand", "Assigning request to peer {}", id);
self.pending_requests.write().insert(
req_id,
Request::BlockReceipts(header, sender)
);
return
}
Err(e) =>
trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e),
}
}
}
// TODO: retrying
trace!(target: "on_demand", "No suitable peer for request");
sender.complete(Err(Error::NoPeersAvailable));
}
Pending::Account(header, address, sender) => {
let num = header.number();
let req = LesRequest::StateProofs(les_request::StateProofs {
requests: vec![les_request::StateProof {
block: header.hash(),
key1: ::util::Hashable::sha3(&address),
key2: None,
from_level: 0,
}],
});
// we're looking for a peer with serveStateSince(num)
for (id, peer) in self.peers.read().iter() {
if peer.capabilities.serve_state_since.as_ref().map_or(false, |x| *x >= num) {
match ctx.request_from(*id, req.clone()) {
Ok(req_id) => {
trace!(target: "on_demand", "Assigning request to peer {}", id);
self.pending_requests.write().insert(
req_id,
Request::Account(header, address, sender)
);
return
}
Err(e) =>
trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e),
}
}
}
// TODO: retrying
trace!(target: "on_demand", "No suitable peer for request");
sender.complete(Err(Error::NoPeersAvailable));
}
Pending::Storage(header, address, key, sender) => {
let num = header.number();
let req = LesRequest::StateProofs(les_request::StateProofs {
requests: vec![les_request::StateProof {
block: header.hash(),
key1: ::util::Hashable::sha3(&address),
key2: Some(::util::Hashable::sha3(&key)),
from_level: 0,
}],
});
// we're looking for a peer with serveStateSince(num)
for (id, peer) in self.peers.read().iter() {
if peer.capabilities.serve_state_since.as_ref().map_or(false, |x| *x >= num) {
match ctx.request_from(*id, req.clone()) {
Ok(req_id) => {
trace!(target: "on_demand", "Assigning request to peer {}", id);
self.pending_requests.write().insert(
req_id,
Request::Storage(header, address, key, sender)
);
return
}
Err(e) =>
trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e),
}
}
}
// TODO: retrying
trace!(target: "on_demand", "No suitable peer for request");
sender.complete(Err(Error::NoPeersAvailable));
}
}
// TODO: retrying
trace!(target: "on_demand", "No suitable peer for request");
sender.complete(Err(Error::NoPeersAvailable));
Response(receiver)
}
}
@ -366,7 +305,7 @@ impl Handler for OnDemand {
for unfulfilled in unfulfilled {
if let Some(pending) = self.pending_requests.write().remove(unfulfilled) {
trace!(target: "on_demand", "Attempting to reassign dropped request");
self.dispatch_request(ctx.as_basic(), pending);
unimplemented!()
}
}
}
@ -387,8 +326,8 @@ impl Handler for OnDemand {
};
match req {
Request::HeaderByNumber(num, cht_root, sender) => {
let (ref header, ref proof) = match proofs.get(0) {
Pending::HeaderByNumber(req, sender) => {
let &&(ref header, ref proof) = match proofs.get(0) {
Some(ref x) => x,
None => {
ctx.disconnect_peer(peer);

View File

@ -1,8 +1,26 @@
// Copyright 2015, 2016 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Request types, verification, and verification errors.
use ethcore::encoded;
use ethcore::receipt::Receipt;
use rlp::{RlpStream, Stream};
use util::{Address, Bytes, HashDB, H256, U256};
use rlp::{RlpStream, Stream, UntrustedRlp, View};
use util::{Address, Bytes, HashDB, H256};
use util::memorydb::MemoryDB;
use util::sha3::Hashable;
use util::trie::{Trie, TrieDB, TrieError};
@ -41,7 +59,7 @@ impl From<Box<TrieError>> for Error {
/// Request for a header by number.
pub struct HeaderByNumber {
/// The header's number.
pub num: u64
pub num: u64,
/// The root of the CHT containing this header.
pub cht_root: H256,
}
@ -50,12 +68,11 @@ impl HeaderByNumber {
/// Check a response with a header and cht proof.
pub fn check_response(&self, header: &[u8], proof: &[Bytes]) -> Result<encoded::Header, Error> {
use util::trie::{Trie, TrieDB};
use rlp::{UntrustedRlp, View};
// check the proof
let mut db = MemoryDB::new();
for node in proof { db.insert(&node[..]) }
for node in proof { db.insert(&node[..]); }
let key = ::rlp::encode(&self.num);
let expected_hash: H256 = match TrieDB::new(&db, &self.cht_root).and_then(|t| t.get(&*key))? {
@ -107,12 +124,12 @@ impl Body {
let uncles_hash = body_view.at(1)?.as_raw().sha3();
if uncles_hash != self.header.uncles_hash() {
return Err(Error::WrongHash(self.header.uncles_hash(), uncles_hash);
return Err(Error::WrongHash(self.header.uncles_hash(), uncles_hash));
}
// concatenate the header and the body.
let mut stream = RlpStream::new_list(3);
stream.append_raw(header.rlp().as_raw(), 1);
stream.append_raw(self.header.rlp().as_raw(), 1);
stream.append_raw(body, 2);
Ok(encoded::Block::new(stream.out()))
@ -145,14 +162,23 @@ pub struct Account {
impl Account {
/// Check a response with an account against the stored header.
pub fn check_response(&self, proof: &[Bytes]) -> Result<Vec<Receipt>, Error> {
let state_root = header.state_root();
pub fn check_response(&self, proof: &[Bytes]) -> Result<BasicAccount, Error> {
let state_root = self.header.state_root();
let mut db = MemoryDB::new();
for node in proof { db.insert(&*node) }
match TrieDB::new(&db, &state_root).and_then(|t| t.get(&address.sha3()))? {
for node in proof { db.insert(&node[..]); }
match TrieDB::new(&db, &state_root).and_then(|t| t.get(&self.address.sha3()))? {
Some(val) => {
let rlp = UntrustedRlp::new(&val);
Ok(BasicAccount {
nonce: rlp.val_at(0)?,
balance: rlp.val_at(1)?,
storage_root: rlp.val_at(2)?,
code_hash: rlp.val_at(3)?,
})
},
None => Err(Error::BadProof)
}
}
}