From 01977e60aa6a8b771ce50201de6abef09386204a Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 3 Jan 2017 19:13:40 +0100 Subject: [PATCH] finish request module, basic dispatch --- ethcore/light/src/on_demand/mod.rs | 381 +++++++++++-------------- ethcore/light/src/on_demand/request.rs | 50 +++- 2 files changed, 198 insertions(+), 233 deletions(-) diff --git a/ethcore/light/src/on_demand/mod.rs b/ethcore/light/src/on_demand/mod.rs index d020ec8da..d6e2b59c0 100644 --- a/ethcore/light/src/on_demand/mod.rs +++ b/ethcore/light/src/on_demand/mod.rs @@ -28,7 +28,7 @@ use futures::sync::oneshot; use network::PeerId; use net::{Handler, Status, Capabilities, Announcement, EventContext, BasicContext, ReqId}; -use util::{Address, H256, U256, RwLock}; +use util::{Bytes, H256, U256, RwLock}; use types::les_request::{self as les_request, Request as LesRequest}; pub mod request; @@ -48,6 +48,7 @@ pub struct Account { } /// Errors which can occur while trying to fulfill a request. +#[derive(Debug, Clone, Copy)] pub enum Error { /// Request was canceled. Canceled, @@ -88,10 +89,10 @@ struct Peer { } // Attempted request info and sender to put received value. -enum Attempted { +enum Pending { HeaderByNumber(request::HeaderByNumber, Sender), // num + CHT root HeaderByHash(request::HeaderByHash, Sender), - Block(request::Block, Sender), + Block(request::Body, Sender), BlockReceipts(request::BlockReceipts, Sender>), Account(request::Account, Sender), } @@ -101,257 +102,195 @@ enum Attempted { /// requests to them accordingly. pub struct OnDemand { peers: RwLock>, - pending_requests: RwLock>, + pending_requests: RwLock>, } impl OnDemand { /// Request a header by block number and CHT root hash. - pub fn header_by_number(&self, ctx: &BasicContext, num: u64, cht_root: H256) -> Response { + pub fn header_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber) -> Response { let (sender, receiver) = oneshot::channel(); - self.dispatch_request(ctx, Pending::HeaderByNumber(num, cht_root, sender)); + let num = req.num; + let cht_num = ::client::cht::block_to_cht_number(req.num); + let les_req = LesRequest::HeaderProofs(les_request::HeaderProofs { + requests: vec![les_request::HeaderProof { + cht_number: cht_num, + block_number: req.num, + from_level: 0, + }], + }); + + // we're looking for a peer with serveHeaders who's far enough along in the + // chain. + for (id, peer) in self.peers.read().iter() { + if peer.capabilities.serve_headers && peer.status.head_num >= num { + match ctx.request_from(*id, les_req.clone()) { + Ok(req_id) => { + trace!(target: "on_demand", "Assigning request to peer {}", id); + self.pending_requests.write().insert( + req_id, + Pending::HeaderByNumber(req, sender) + ); + return Response(receiver); + }, + Err(e) => + trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), + } + } + } + + // TODO: retrying + trace!(target: "on_demand", "No suitable peer for request"); + sender.complete(Err(Error::NoPeersAvailable)); Response(receiver) } /// Request a header by hash. This is less accurate than by-number because we don't know /// where in the chain this header lies, and therefore can't find a peer who is supposed to have /// it as easily. - pub fn header_by_hash(&self, ctx: &BasicContext, hash: H256) -> Response { + pub fn header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash) -> Response { let (sender, receiver) = oneshot::channel(); - self.dispatch_request(ctx, Pending::HeaderByHash(hash, sender)); + let les_req = LesRequest::Headers(les_request::Headers { + start: req.0.into(), + max: 1, + skip: 0, + reverse: false, + }); + + // all we've got is a hash, so we'll just guess at peers who might have + // it randomly. + let mut potential_peers = self.peers.read().iter() + .filter(|&(_, peer)| peer.capabilities.serve_headers) + .map(|(id, _)| *id) + .collect::>(); + + let mut rng = ::rand::thread_rng(); + + ::rand::Rng::shuffle(&mut rng, &mut potential_peers); + + for id in potential_peers { + match ctx.request_from(id, les_req.clone()) { + Ok(req_id) => { + trace!(target: "on_demand", "Assigning request to peer {}", id); + self.pending_requests.write().insert( + req_id, + Pending::HeaderByHash(req, sender), + ); + return Response(receiver); + } + Err(e) => + trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), + } + } + + // TODO: retrying + trace!(target: "on_demand", "No suitable peer for request"); + sender.complete(Err(Error::NoPeersAvailable)); Response(receiver) } /// Request a block, given its header. Block bodies are requestable by hash only, /// and the header is required anyway to verify and complete the block body /// -- this just doesn't obscure the network query. - pub fn block(&self, ctx: &BasicContext, header: encoded::Header) -> Response { + pub fn block(&self, ctx: &BasicContext, req: request::Body) -> Response { let (sender, receiver) = oneshot::channel(); - let hash = header.hash(); - self.dispatch_request(ctx, Pending::Block(header, hash, sender)); + let num = req.header.number(); + let les_req = LesRequest::Bodies(les_request::Bodies { + block_hashes: vec![req.hash], + }); + + // we're looking for a peer with serveChainSince(num) + for (id, peer) in self.peers.read().iter() { + if peer.capabilities.serve_chain_since.as_ref().map_or(false, |x| *x >= num) { + match ctx.request_from(*id, les_req.clone()) { + Ok(req_id) => { + trace!(target: "on_demand", "Assigning request to peer {}", id); + self.pending_requests.write().insert( + req_id, + Pending::Block(req, sender) + ); + return Response(receiver) + } + Err(e) => + trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), + } + } + } + + // TODO: retrying + trace!(target: "on_demand", "No suitable peer for request"); + sender.complete(Err(Error::NoPeersAvailable)); Response(receiver) } /// Request the receipts for a block. The header serves two purposes: /// provide the block hash to fetch receipts for, and for verification of the receipts root. - pub fn block_receipts(&self, ctx: &BasicContext, header: encoded::Header) -> Response> { + pub fn block_receipts(&self, ctx: &BasicContext, req: request::BlockReceipts) -> Response> { let (sender, receiver) = oneshot::channel(); - self.dispatch_request(ctx, Pending::BlockReceipts(header, sender)); + let num = req.0.number(); + let les_req = LesRequest::Receipts(les_request::Receipts { + block_hashes: vec![req.0.hash()], + }); + + // we're looking for a peer with serveChainSince(num) + for (id, peer) in self.peers.read().iter() { + if peer.capabilities.serve_chain_since.as_ref().map_or(false, |x| *x >= num) { + match ctx.request_from(*id, les_req.clone()) { + Ok(req_id) => { + trace!(target: "on_demand", "Assigning request to peer {}", id); + self.pending_requests.write().insert( + req_id, + Pending::BlockReceipts(req, sender) + ); + return Response(receiver) + } + Err(e) => + trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), + } + } + } + + // TODO: retrying + trace!(target: "on_demand", "No suitable peer for request"); + sender.complete(Err(Error::NoPeersAvailable)); Response(receiver) } /// Request an account by address and block header -- which gives a hash to query and a state root /// to verify against. - pub fn account(&self, ctx: &BasicContext, header: encoded::Header, address: Address) -> Response { + pub fn account(&self, ctx: &BasicContext, req: request::Account) -> Response { let (sender, receiver) = oneshot::channel(); - self.dispatch_request(ctx, Pending::Account(header, address, sender)); - Response(receiver) - } + let num = req.header.number(); + let les_req = LesRequest::StateProofs(les_request::StateProofs { + requests: vec![les_request::StateProof { + block: req.header.hash(), + key1: ::util::Hashable::sha3(&req.address), + key2: None, + from_level: 0, + }], + }); - /// Request account storage value by block header, address, and key. - pub fn storage(&self, ctx: &BasicContext, header: encoded::Header, address: Address, key: H256) -> Response { - let (sender, receiver) = oneshot::channel(); - self.dispatch_request(ctx, Pending::Storage(header, address, key, sender)); - Response(receiver) - } - - // dispatch a request to a suitable peer. - // - // TODO: most of this will become obsolete with a PeerSearch utility (#3987) - fn dispatch_request(&self, ctx: &BasicContext, request: Request) { - match request { - Pending::HeaderByNumber(request::HeaderByNumber(num, cht_hash), sender) => { - let cht_num = ::client::cht::block_to_cht_number(num); - let req = LesRequest::HeaderProofs(les_request::HeaderProofs { - requests: vec![les_request::HeaderProof { - cht_number: cht_num, - block_number: num, - from_level: 0, - }], - }); - - // we're looking for a peer with serveHeaders who's far enough along in the - // chain. - for (id, peer) in self.peers.read().iter() { - if peer.capabilities.serve_headers && peer.status.head_num >= num { - match ctx.request_from(*id, req.clone()) { - Ok(req_id) => { - trace!(target: "on_demand", "Assigning request to peer {}", id); - self.pending_requests.write().insert( - req_id, - Pending::HeaderByNumber(num, cht_hash, sender) - ); - return - }, - Err(e) => - trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), - } + // we're looking for a peer with serveStateSince(num) + for (id, peer) in self.peers.read().iter() { + if peer.capabilities.serve_state_since.as_ref().map_or(false, |x| *x >= num) { + match ctx.request_from(*id, les_req.clone()) { + Ok(req_id) => { + trace!(target: "on_demand", "Assigning request to peer {}", id); + self.pending_requests.write().insert( + req_id, + Pending::Account(req, sender) + ); + return Response(receiver) } + Err(e) => + trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), } - - // TODO: retrying - trace!(target: "on_demand", "No suitable peer for request"); - sender.complete(Err(Error::NoPeersAvailable)); - } - Pending::HeaderByHash(hash, sender) => { - let req = LesRequest::Headers(les_request::Headers { - start: hash.into(), - max: 1, - skip: 0, - reverse: false, - }); - - // all we've got is a hash, so we'll just guess at peers who might have - // it randomly. - let mut potential_peers = self.peers.read().iter() - .filter(|&(_, peer)| peer.capabilities.serve_headers) - .map(|(id, _)| *id) - .collect::>(); - - let mut rng = ::rand::thread_rng(); - - ::rand::Rng::shuffle(&mut rng, &mut potential_peers); - - for id in potential_peers { - match ctx.request_from(id, req.clone()) { - Ok(req_id) => { - trace!(target: "on_demand", "Assigning request to peer {}", id); - self.pending_requests.write().insert( - req_id, - Request::HeaderByHash(hash, sender), - ); - return - } - Err(e) => - trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), - } - } - // TODO: retrying - trace!(target: "on_demand", "No suitable peer for request"); - sender.complete(Err(Error::NoPeersAvailable)); - } - Pending::Block(header, hash, sender) => { - let num = header.number(); - let req = LesRequest::Bodies(les_request::Bodies { - block_hashes: vec![hash], - }); - - // we're looking for a peer with serveChainSince(num) - for (id, peer) in self.peers.read().iter() { - if peer.capabilities.serve_chain_since.as_ref().map_or(false, |x| *x >= num) { - match ctx.request_from(*id, req.clone()) { - Ok(req_id) => { - trace!(target: "on_demand", "Assigning request to peer {}", id); - self.pending_requests.write().insert( - req_id, - Request::Block(header, hash, sender) - ); - return - } - Err(e) => - trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), - } - } - } - - // TODO: retrying - trace!(target: "on_demand", "No suitable peer for request"); - sender.complete(Err(Error::NoPeersAvailable)); - } - Pending::BlockReceipts(header, sender) => { - let num = header.number(); - let req = LesRequest::Receipts(les_request::Receipts { - block_hashes: vec![header.hash()], - }); - - // we're looking for a peer with serveChainSince(num) - for (id, peer) in self.peers.read().iter() { - if peer.capabilities.serve_chain_since.as_ref().map_or(false, |x| *x >= num) { - match ctx.request_from(*id, req.clone()) { - Ok(req_id) => { - trace!(target: "on_demand", "Assigning request to peer {}", id); - self.pending_requests.write().insert( - req_id, - Request::BlockReceipts(header, sender) - ); - return - } - Err(e) => - trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), - } - } - } - - // TODO: retrying - trace!(target: "on_demand", "No suitable peer for request"); - sender.complete(Err(Error::NoPeersAvailable)); - } - Pending::Account(header, address, sender) => { - let num = header.number(); - let req = LesRequest::StateProofs(les_request::StateProofs { - requests: vec![les_request::StateProof { - block: header.hash(), - key1: ::util::Hashable::sha3(&address), - key2: None, - from_level: 0, - }], - }); - - // we're looking for a peer with serveStateSince(num) - for (id, peer) in self.peers.read().iter() { - if peer.capabilities.serve_state_since.as_ref().map_or(false, |x| *x >= num) { - match ctx.request_from(*id, req.clone()) { - Ok(req_id) => { - trace!(target: "on_demand", "Assigning request to peer {}", id); - self.pending_requests.write().insert( - req_id, - Request::Account(header, address, sender) - ); - return - } - Err(e) => - trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), - } - } - } - - // TODO: retrying - trace!(target: "on_demand", "No suitable peer for request"); - sender.complete(Err(Error::NoPeersAvailable)); - } - Pending::Storage(header, address, key, sender) => { - let num = header.number(); - let req = LesRequest::StateProofs(les_request::StateProofs { - requests: vec![les_request::StateProof { - block: header.hash(), - key1: ::util::Hashable::sha3(&address), - key2: Some(::util::Hashable::sha3(&key)), - from_level: 0, - }], - }); - - // we're looking for a peer with serveStateSince(num) - for (id, peer) in self.peers.read().iter() { - if peer.capabilities.serve_state_since.as_ref().map_or(false, |x| *x >= num) { - match ctx.request_from(*id, req.clone()) { - Ok(req_id) => { - trace!(target: "on_demand", "Assigning request to peer {}", id); - self.pending_requests.write().insert( - req_id, - Request::Storage(header, address, key, sender) - ); - return - } - Err(e) => - trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), - } - } - } - - // TODO: retrying - trace!(target: "on_demand", "No suitable peer for request"); - sender.complete(Err(Error::NoPeersAvailable)); } } + + // TODO: retrying + trace!(target: "on_demand", "No suitable peer for request"); + sender.complete(Err(Error::NoPeersAvailable)); + Response(receiver) } } @@ -366,7 +305,7 @@ impl Handler for OnDemand { for unfulfilled in unfulfilled { if let Some(pending) = self.pending_requests.write().remove(unfulfilled) { trace!(target: "on_demand", "Attempting to reassign dropped request"); - self.dispatch_request(ctx.as_basic(), pending); + unimplemented!() } } } @@ -387,8 +326,8 @@ impl Handler for OnDemand { }; match req { - Request::HeaderByNumber(num, cht_root, sender) => { - let (ref header, ref proof) = match proofs.get(0) { + Pending::HeaderByNumber(req, sender) => { + let &&(ref header, ref proof) = match proofs.get(0) { Some(ref x) => x, None => { ctx.disconnect_peer(peer); diff --git a/ethcore/light/src/on_demand/request.rs b/ethcore/light/src/on_demand/request.rs index b0bb8ab18..09846c2bf 100644 --- a/ethcore/light/src/on_demand/request.rs +++ b/ethcore/light/src/on_demand/request.rs @@ -1,8 +1,26 @@ +// Copyright 2015, 2016 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Request types, verification, and verification errors. + use ethcore::encoded; use ethcore::receipt::Receipt; -use rlp::{RlpStream, Stream}; -use util::{Address, Bytes, HashDB, H256, U256}; +use rlp::{RlpStream, Stream, UntrustedRlp, View}; +use util::{Address, Bytes, HashDB, H256}; use util::memorydb::MemoryDB; use util::sha3::Hashable; use util::trie::{Trie, TrieDB, TrieError}; @@ -41,7 +59,7 @@ impl From> for Error { /// Request for a header by number. pub struct HeaderByNumber { /// The header's number. - pub num: u64 + pub num: u64, /// The root of the CHT containing this header. pub cht_root: H256, } @@ -50,12 +68,11 @@ impl HeaderByNumber { /// Check a response with a header and cht proof. pub fn check_response(&self, header: &[u8], proof: &[Bytes]) -> Result { use util::trie::{Trie, TrieDB}; - use rlp::{UntrustedRlp, View}; // check the proof let mut db = MemoryDB::new(); - for node in proof { db.insert(&node[..]) } + for node in proof { db.insert(&node[..]); } let key = ::rlp::encode(&self.num); let expected_hash: H256 = match TrieDB::new(&db, &self.cht_root).and_then(|t| t.get(&*key))? { @@ -107,12 +124,12 @@ impl Body { let uncles_hash = body_view.at(1)?.as_raw().sha3(); if uncles_hash != self.header.uncles_hash() { - return Err(Error::WrongHash(self.header.uncles_hash(), uncles_hash); + return Err(Error::WrongHash(self.header.uncles_hash(), uncles_hash)); } // concatenate the header and the body. let mut stream = RlpStream::new_list(3); - stream.append_raw(header.rlp().as_raw(), 1); + stream.append_raw(self.header.rlp().as_raw(), 1); stream.append_raw(body, 2); Ok(encoded::Block::new(stream.out())) @@ -145,14 +162,23 @@ pub struct Account { impl Account { /// Check a response with an account against the stored header. - pub fn check_response(&self, proof: &[Bytes]) -> Result, Error> { - let state_root = header.state_root(); + pub fn check_response(&self, proof: &[Bytes]) -> Result { + let state_root = self.header.state_root(); let mut db = MemoryDB::new(); - for node in proof { db.insert(&*node) } - - match TrieDB::new(&db, &state_root).and_then(|t| t.get(&address.sha3()))? { + for node in proof { db.insert(&node[..]); } + match TrieDB::new(&db, &state_root).and_then(|t| t.get(&self.address.sha3()))? { + Some(val) => { + let rlp = UntrustedRlp::new(&val); + Ok(BasicAccount { + nonce: rlp.val_at(0)?, + balance: rlp.val_at(1)?, + storage_root: rlp.val_at(2)?, + code_hash: rlp.val_at(3)?, + }) + }, + None => Err(Error::BadProof) } } }