From cbb9314531be7b2ea2c2deb41f9ffafd83da19ca Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 16 Mar 2017 20:23:59 +0100 Subject: [PATCH] use PIP messages in on_demand, old API --- Cargo.lock | 35 +- ethcore/light/src/lib.rs | 2 +- ethcore/light/src/on_demand/mod.rs | 610 ++++++++----------------- ethcore/light/src/on_demand/request.rs | 61 +-- ethcore/light/src/types/request/mod.rs | 4 +- 5 files changed, 227 insertions(+), 485 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6924cfe00..72d0b7778 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -445,7 +445,7 @@ dependencies = [ "ethcore-rpc 1.7.0", "ethcore-util 1.7.0", "fetch 0.1.0", - "futures 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.10.0-a.0 (git+https://github.com/ethcore/hyper)", "jsonrpc-core 6.0.0 (git+https://github.com/ethcore/jsonrpc.git)", "jsonrpc-http-server 6.0.0 (git+https://github.com/ethcore/jsonrpc.git)", @@ -554,7 +554,7 @@ dependencies = [ "ethcore-ipc-codegen 1.7.0", "ethcore-network 1.7.0", "ethcore-util 1.7.0", - "futures 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "itertools 0.5.9 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", @@ -620,7 +620,7 @@ dependencies = [ "ethstore 0.1.0", "ethsync 1.7.0", "fetch 0.1.0", - "futures 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 6.0.0 (git+https://github.com/ethcore/jsonrpc.git)", "jsonrpc-http-server 6.0.0 (git+https://github.com/ethcore/jsonrpc.git)", "jsonrpc-ipc-server 6.0.0 (git+https://github.com/ethcore/jsonrpc.git)", @@ -686,7 +686,7 @@ dependencies = [ "ethcore-ipc-codegen 1.7.0", "ethcore-ipc-nano 1.7.0", "ethcore-util 1.7.0", - "futures 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 6.0.0 (git+https://github.com/ethcore/jsonrpc.git)", "jsonrpc-macros 6.0.0 (git+https://github.com/ethcore/jsonrpc.git)", "jsonrpc-tcp-server 6.0.0 (git+https://github.com/ethcore/jsonrpc.git)", @@ -849,7 +849,7 @@ dependencies = [ name = "fetch" version = "0.1.0" dependencies = [ - "futures 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "futures-cpupool 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "mime 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -868,11 +868,8 @@ dependencies = [ [[package]] name = "futures" -version = "0.1.10" +version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", -] [[package]] name = "futures-cpupool" @@ -880,7 +877,7 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "crossbeam 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1086,7 +1083,7 @@ name = "jsonrpc-core" version = "6.0.0" source = "git+https://github.com/ethcore/jsonrpc.git#86d7a89c85f324b5f6671315d9b71010ca995300" dependencies = [ - "futures 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "serde 0.9.6 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 0.9.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1623,7 +1620,7 @@ dependencies = [ "ethabi 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "ethcore-util 1.7.0", "fetch 0.1.0", - "futures 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "mime 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "mime_guess 1.6.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1665,7 +1662,7 @@ dependencies = [ name = "parity-reactor" version = "0.1.0" dependencies = [ - "futures 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1676,7 +1673,7 @@ dependencies = [ "ethcore-rpc 1.7.0", "ethcore-signer 1.7.0", "ethcore-util 1.7.0", - "futures 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 6.0.0 (git+https://github.com/ethcore/jsonrpc.git)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1994,7 +1991,7 @@ dependencies = [ "ethcore-bigint 0.1.2", "ethcore-rpc 1.7.0", "ethcore-util 1.7.0", - "futures 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "parity-rpc-client 1.4.0", "rpassword 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -2345,7 +2342,7 @@ name = "tokio-core" version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "futures 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "mio 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", "scoped-tls 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2357,7 +2354,7 @@ name = "tokio-proto" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "futures 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "net2 0.2.23 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2373,7 +2370,7 @@ name = "tokio-service" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "futures 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2606,7 +2603,7 @@ dependencies = [ "checksum ethabi 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5d8f6cc4c1acd005f48e1d17b06a461adac8fb6eeeb331fbf19a0e656fba91cd" "checksum fdlimit 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b1ee15a7050e5580b3712877157068ea713b245b080ff302ae2ca973cfcd9baa" "checksum flate2 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)" = "3eeb481e957304178d2e782f2da1257f1434dfecbae883bafb61ada2a9fea3bb" -"checksum futures 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "c1913eb7083840b1bbcbf9631b7fda55eaf35fe7ead13cca034e8946f9e2bc41" +"checksum futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "8e51e7f9c150ba7fd4cee9df8bf6ea3dea5b63b68955ddad19ccd35b71dcfb4d" "checksum futures-cpupool 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "bb982bb25cd8fa5da6a8eb3a460354c984ff1113da82bcb4f0b0862b5795db82" "checksum gcc 0.3.35 (registry+https://github.com/rust-lang/crates.io-index)" = "91ecd03771effb0c968fd6950b37e89476a578aaf1c70297d8e92b6516ec3312" "checksum gdi32-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0912515a8ff24ba900422ecda800b52f4016a56251922d397c576bf92c690518" diff --git a/ethcore/light/src/lib.rs b/ethcore/light/src/lib.rs index 81a974192..ada58d8de 100644 --- a/ethcore/light/src/lib.rs +++ b/ethcore/light/src/lib.rs @@ -35,7 +35,7 @@ pub mod client; pub mod cht; pub mod net; -//pub mod on_demand; +pub mod on_demand; pub mod transaction_queue; pub mod cache; diff --git a/ethcore/light/src/on_demand/mod.rs b/ethcore/light/src/on_demand/mod.rs index 25cde402b..df8a6c6a9 100644 --- a/ethcore/light/src/on_demand/mod.rs +++ b/ethcore/light/src/on_demand/mod.rs @@ -31,12 +31,12 @@ use futures::{Async, Poll, Future}; use futures::sync::oneshot::{self, Sender, Receiver}; use network::PeerId; use rlp::{RlpStream, Stream}; -use util::{Bytes, DBValue, RwLock, Mutex, U256}; +use util::{Bytes, DBValue, RwLock, Mutex, U256, H256}; use util::sha3::{SHA3_NULL_RLP, SHA3_EMPTY_LIST_RLP}; use net::{Handler, Status, Capabilities, Announcement, EventContext, BasicContext, ReqId}; use cache::Cache; -use types::les_request::{self as les_request, Request as LesRequest}; +use request::{self as basic_request, Request as NetworkRequest, Response as NetworkResponse}; pub mod request; @@ -46,24 +46,85 @@ struct Peer { capabilities: Capabilities, } +impl Peer { + // Whether a given peer can handle a specific request. + fn can_handle(&self, pending: &Pending) -> bool { + match *pending { + Pending::HeaderProof(ref req, _) => + self.capabilities.serve_headers && self.status.head_num > req.num(), + Pending::HeaderByHash(ref req, _) => self.capabilities.serve_headers, + Pending::Block(ref req, _) => + self.capabilities.serve_chain_since.as_ref().map_or(false, |x| *x >= req.header.number()), + Pending::BlockReceipts(ref req, _) => + self.capabilities.serve_chain_since.as_ref().map_or(false, |x| *x >= req.0.number()), + Pending::Account(ref req, _) => + self.capabilities.serve_state_since.as_ref().map_or(false, |x| *x >= req.header.number()), + Pending::Code(ref req, _) => + self.capabilities.serve_state_since.as_ref().map_or(false, |x| *x >= req.block_id.1), + Pending::TxProof(ref req, _) => + self.capabilities.serve_state_since.as_ref().map_or(false, |x| *x >= req.header.number()), + } + } +} + // Which portions of a CHT proof should be sent. enum ChtProofSender { - Both(Sender<(encoded::Header, U256)>), - Header(Sender), + Both(Sender<(H256, U256)>), + Hash(Sender), ChainScore(Sender), } // Attempted request info and sender to put received value. enum Pending { - HeaderByNumber(request::HeaderByNumber, ChtProofSender), + HeaderProof(request::HeaderProof, ChtProofSender), HeaderByHash(request::HeaderByHash, Sender), Block(request::Body, Sender), BlockReceipts(request::BlockReceipts, Sender>), - Account(request::Account, Sender), + Account(request::Account, Sender>), Code(request::Code, Sender), TxProof(request::TransactionProof, Sender>), } +impl Pending { + // Create a network request. + fn make_request(&self) -> NetworkRequest { + match *self { + Pending::HeaderByHash(ref req, _) => NetworkRequest::Headers(basic_request::IncompleteHeadersRequest { + start: basic_request::HashOrNumber::Hash(req.0).into(), + skip: 0, + max: 1, + reverse: false, + }), + Pending::HeaderProof(ref req, _) => NetworkRequest::HeaderProof(basic_request::IncompleteHeaderProofRequest { + num: req.num().into(), + }), + Pending::Block(ref req, _) => NetworkRequest::Body(basic_request::IncompleteBodyRequest { + hash: req.hash.into(), + }), + Pending::BlockReceipts(ref req, _) => NetworkRequest::Receipts(basic_request::IncompleteReceiptsRequest { + hash: req.0.hash().into(), + }), + Pending::Account(ref req, _) => NetworkRequest::Account(basic_request::IncompleteAccountRequest { + block_hash: req.header.hash().into(), + address_hash: ::util::Hashable::sha3(&req.address).into(), + }), + Pending::Code(ref req, _) => NetworkRequest::Code(basic_request::IncompleteCodeRequest { + block_hash: req.block_id.0.into(), + code_hash: req.code_hash.into(), + }), + Pending::TxProof(ref req, _) => NetworkRequest::Execution(basic_request::IncompleteExecutionRequest { + block_hash: req.header.hash().into(), + from: req.tx.sender(), + gas: req.tx.gas, + gas_price: req.tx.gas_price, + action: req.tx.action.clone(), + value: req.tx.value, + data: req.tx.data.clone(), + }), + } + } +} + /// On demand request service. See module docs for more details. /// Accumulates info about all peers' capabilities and dispatches /// requests to them accordingly. @@ -85,25 +146,25 @@ impl OnDemand { } } - /// Request a header by block number and CHT root hash. - /// Returns the header. - pub fn header_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber) -> Receiver { + /// Request a header's hash by block number and CHT root hash. + /// Returns the hash. + pub fn hash_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> Receiver { let (sender, receiver) = oneshot::channel(); let cached = { let mut cache = self.cache.lock(); - cache.block_hash(&req.num()).and_then(|hash| cache.block_header(&hash)) + cache.block_hash(&req.num()) }; match cached { - Some(hdr) => sender.complete(hdr), - None => self.dispatch_header_by_number(ctx, req, ChtProofSender::Header(sender)), + Some(hash) => sender.complete(hash), + None => self.dispatch(ctx, Pending::HeaderProof(req, ChtProofSender::Hash(sender))), } receiver } /// Request a canonical block's chain score. /// Returns the chain score. - pub fn chain_score_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber) -> Receiver { + pub fn chain_score_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> Receiver { let (sender, receiver) = oneshot::channel(); let cached = { let mut cache = self.cache.lock(); @@ -112,71 +173,33 @@ impl OnDemand { match cached { Some(score) => sender.complete(score), - None => self.dispatch_header_by_number(ctx, req, ChtProofSender::ChainScore(sender)), + None => self.dispatch(ctx, Pending::HeaderProof(req, ChtProofSender::ChainScore(sender))), } receiver } - /// Request a canonical block's chain score. - /// Returns the header and chain score. - pub fn header_and_score_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber) -> Receiver<(encoded::Header, U256)> { + /// Request a canonical block's hash and chain score by number. + /// Returns the hash and chain score. + pub fn hash_and_score_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> Receiver<(H256, U256)> { let (sender, receiver) = oneshot::channel(); let cached = { let mut cache = self.cache.lock(); let hash = cache.block_hash(&req.num()); ( - hash.clone().and_then(|hash| cache.block_header(&hash)), + hash.clone(), hash.and_then(|hash| cache.chain_score(&hash)), ) }; match cached { - (Some(hdr), Some(score)) => sender.complete((hdr, score)), - _ => self.dispatch_header_by_number(ctx, req, ChtProofSender::Both(sender)), + (Some(hash), Some(score)) => sender.complete((hash, score)), + _ => self.dispatch(ctx, Pending::HeaderProof(req, ChtProofSender::Both(sender))), } receiver } - // dispatch the request, completing the request if no peers available. - fn dispatch_header_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber, sender: ChtProofSender) { - let num = req.num(); - let cht_num = req.cht_num(); - - let les_req = LesRequest::HeaderProofs(les_request::HeaderProofs { - requests: vec![les_request::HeaderProof { - cht_number: cht_num, - block_number: num, - from_level: 0, - }], - }); - - let pending = Pending::HeaderByNumber(req, sender); - - // we're looking for a peer with serveHeaders who's far enough along in the - // chain. - for (id, peer) in self.peers.read().iter() { - if peer.capabilities.serve_headers && peer.status.head_num >= num { - match ctx.request_from(*id, les_req.clone()) { - Ok(req_id) => { - trace!(target: "on_demand", "Assigning request to peer {}", id); - self.pending_requests.write().insert( - req_id, - pending, - ); - return - }, - Err(e) => - trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), - } - } - } - - trace!(target: "on_demand", "No suitable peer for request"); - self.orphaned_requests.write().push(pending) - } - /// Request a header by hash. This is less accurate than by-number because we don't know /// where in the chain this header lies, and therefore can't find a peer who is supposed to have /// it as easily. @@ -184,50 +207,11 @@ impl OnDemand { let (sender, receiver) = oneshot::channel(); match self.cache.lock().block_header(&req.0) { Some(hdr) => sender.complete(hdr), - None => self.dispatch_header_by_hash(ctx, req, sender), + None => self.dispatch(ctx, Pending::HeaderByHash(req, sender)), } receiver } - fn dispatch_header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash, sender: Sender) { - let les_req = LesRequest::Headers(les_request::Headers { - start: req.0.into(), - max: 1, - skip: 0, - reverse: false, - }); - - // all we've got is a hash, so we'll just guess at peers who might have - // it randomly. - let mut potential_peers = self.peers.read().iter() - .filter(|&(_, peer)| peer.capabilities.serve_headers) - .map(|(id, _)| *id) - .collect::>(); - - let mut rng = ::rand::thread_rng(); - ::rand::Rng::shuffle(&mut rng, &mut potential_peers); - - let pending = Pending::HeaderByHash(req, sender); - - for id in potential_peers { - match ctx.request_from(id, les_req.clone()) { - Ok(req_id) => { - trace!(target: "on_demand", "Assigning request to peer {}", id); - self.pending_requests.write().insert( - req_id, - pending, - ); - return - } - Err(e) => - trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), - } - } - - trace!(target: "on_demand", "No suitable peer for request"); - self.orphaned_requests.write().push(pending) - } - /// Request a block, given its header. Block bodies are requestable by hash only, /// and the header is required anyway to verify and complete the block body /// -- this just doesn't obscure the network query. @@ -251,41 +235,12 @@ impl OnDemand { sender.complete(encoded::Block::new(stream.out())); } - None => self.dispatch_block(ctx, req, sender), + None => self.dispatch(ctx, Pending::Block(req, sender)), } } receiver } - fn dispatch_block(&self, ctx: &BasicContext, req: request::Body, sender: Sender) { - let num = req.header.number(); - let les_req = LesRequest::Bodies(les_request::Bodies { - block_hashes: vec![req.hash], - }); - let pending = Pending::Block(req, sender); - - // we're looking for a peer with serveChainSince(num) - for (id, peer) in self.peers.read().iter() { - if peer.capabilities.serve_chain_since.as_ref().map_or(false, |x| *x >= num) { - match ctx.request_from(*id, les_req.clone()) { - Ok(req_id) => { - trace!(target: "on_demand", "Assigning request to peer {}", id); - self.pending_requests.write().insert( - req_id, - pending, - ); - return - } - Err(e) => - trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), - } - } - } - - trace!(target: "on_demand", "No suitable peer for request"); - self.orphaned_requests.write().push(pending) - } - /// Request the receipts for a block. The header serves two purposes: /// provide the block hash to fetch receipts for, and for verification of the receipts root. pub fn block_receipts(&self, ctx: &BasicContext, req: request::BlockReceipts) -> Receiver> { @@ -297,84 +252,21 @@ impl OnDemand { } else { match self.cache.lock().block_receipts(&req.0.hash()) { Some(receipts) => sender.complete(receipts), - None => self.dispatch_block_receipts(ctx, req, sender), + None => self.dispatch(ctx, Pending::BlockReceipts(req, sender)), } } receiver } - fn dispatch_block_receipts(&self, ctx: &BasicContext, req: request::BlockReceipts, sender: Sender>) { - let num = req.0.number(); - let les_req = LesRequest::Receipts(les_request::Receipts { - block_hashes: vec![req.0.hash()], - }); - let pending = Pending::BlockReceipts(req, sender); - - // we're looking for a peer with serveChainSince(num) - for (id, peer) in self.peers.read().iter() { - if peer.capabilities.serve_chain_since.as_ref().map_or(false, |x| *x >= num) { - match ctx.request_from(*id, les_req.clone()) { - Ok(req_id) => { - trace!(target: "on_demand", "Assigning request to peer {}", id); - self.pending_requests.write().insert( - req_id, - pending, - ); - return - } - Err(e) => - trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), - } - } - } - - trace!(target: "on_demand", "No suitable peer for request"); - self.orphaned_requests.write().push(pending) - } - /// Request an account by address and block header -- which gives a hash to query and a state root /// to verify against. - pub fn account(&self, ctx: &BasicContext, req: request::Account) -> Receiver { + pub fn account(&self, ctx: &BasicContext, req: request::Account) -> Receiver> { let (sender, receiver) = oneshot::channel(); - self.dispatch_account(ctx, req, sender); + self.dispatch(ctx, Pending::Account(req, sender)); receiver } - fn dispatch_account(&self, ctx: &BasicContext, req: request::Account, sender: Sender) { - let num = req.header.number(); - let les_req = LesRequest::StateProofs(les_request::StateProofs { - requests: vec![les_request::StateProof { - block: req.header.hash(), - key1: ::util::Hashable::sha3(&req.address), - key2: None, - from_level: 0, - }], - }); - let pending = Pending::Account(req, sender); - - // we're looking for a peer with serveStateSince(num) - for (id, peer) in self.peers.read().iter() { - if peer.capabilities.serve_state_since.as_ref().map_or(false, |x| *x >= num) { - match ctx.request_from(*id, les_req.clone()) { - Ok(req_id) => { - trace!(target: "on_demand", "Assigning request to peer {}", id); - self.pending_requests.write().insert( - req_id, - pending, - ); - return - } - Err(e) => - trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), - } - } - } - - trace!(target: "on_demand", "No suitable peer for request"); - self.orphaned_requests.write().push(pending) - } - /// Request code by address, known code hash, and block header. pub fn code(&self, ctx: &BasicContext, req: request::Code) -> Receiver { let (sender, receiver) = oneshot::channel(); @@ -383,88 +275,50 @@ impl OnDemand { if req.code_hash == ::util::sha3::SHA3_EMPTY { sender.complete(Vec::new()) } else { - self.dispatch_code(ctx, req, sender); + self.dispatch(ctx, Pending::Code(req, sender)); } receiver } - fn dispatch_code(&self, ctx: &BasicContext, req: request::Code, sender: Sender) { - let num = req.block_id.1; - let les_req = LesRequest::Codes(les_request::ContractCodes { - code_requests: vec![les_request::ContractCode { - block_hash: req.block_id.0, - account_key: ::util::Hashable::sha3(&req.address), - }] - }); - let pending = Pending::Code(req, sender); - - // we're looking for a peer with serveStateSince(num) - for (id, peer) in self.peers.read().iter() { - if peer.capabilities.serve_state_since.as_ref().map_or(false, |x| *x >= num) { - match ctx.request_from(*id, les_req.clone()) { - Ok(req_id) => { - trace!(target: "on_demand", "Assigning request to peer {}", id); - self.pending_requests.write().insert( - req_id, - pending - ); - return - } - Err(e) => - trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), - } - } - } - - trace!(target: "on_demand", "No suitable peer for request"); - self.orphaned_requests.write().push(pending) - } - /// Request proof-of-execution for a transaction. pub fn transaction_proof(&self, ctx: &BasicContext, req: request::TransactionProof) -> Receiver> { let (sender, receiver) = oneshot::channel(); - self.dispatch_transaction_proof(ctx, req, sender); + self.dispatch(ctx, Pending::TxProof(req, sender)); receiver } - fn dispatch_transaction_proof(&self, ctx: &BasicContext, req: request::TransactionProof, sender: Sender>) { - let num = req.header.number(); - let les_req = LesRequest::TransactionProof(les_request::TransactionProof { - at: req.header.hash(), - from: req.tx.sender(), - gas: req.tx.gas, - gas_price: req.tx.gas_price, - action: req.tx.action.clone(), - value: req.tx.value, - data: req.tx.data.clone(), - }); - let pending = Pending::TxProof(req, sender); + // dispatch the request, with a "suitability" function to filter acceptable peers. + fn dispatch(&self, ctx: &BasicContext, pending: Pending) { + let mut builder = basic_request::RequestBuilder::default(); + builder.push(pending.make_request()) + .expect("make_request always returns fully complete request; qed"); + + let complete = builder.build(); - // we're looking for a peer with serveStateSince(num) for (id, peer) in self.peers.read().iter() { - if peer.capabilities.serve_state_since.as_ref().map_or(false, |x| *x >= num) { - match ctx.request_from(*id, les_req.clone()) { - Ok(req_id) => { - trace!(target: "on_demand", "Assigning request to peer {}", id); - self.pending_requests.write().insert( - req_id, - pending - ); - return - } - Err(e) => - trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), + if !peer.can_handle(&pending) { continue } + match ctx.request_from(*id, complete.clone()) { + Ok(req_id) => { + trace!(target: "on_demand", "Assigning request to peer {}", id); + self.pending_requests.write().insert( + req_id, + pending, + ); + return } + Err(e) => + trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), } } trace!(target: "on_demand", "No suitable peer for request"); - self.orphaned_requests.write().push(pending) + self.orphaned_requests.write().push(pending); } + // dispatch orphaned requests, and discard those for which the corresponding // receiver has been dropped. fn dispatch_orphaned(&self, ctx: &BasicContext) { @@ -494,30 +348,22 @@ impl OnDemand { let to_dispatch = ::std::mem::replace(&mut *self.orphaned_requests.write(), Vec::new()); - for orphaned in to_dispatch { - match orphaned { - Pending::HeaderByNumber(req, mut sender) => { - let hangup = match sender { + for mut orphaned in to_dispatch { + let hung_up = match orphaned { + Pending::HeaderProof(_, ref mut sender) => match *sender { ChtProofSender::Both(ref mut s) => check_hangup(s), - ChtProofSender::Header(ref mut s) => check_hangup(s), + ChtProofSender::Hash(ref mut s) => check_hangup(s), ChtProofSender::ChainScore(ref mut s) => check_hangup(s), - }; + }, + Pending::HeaderByHash(_, ref mut sender) => check_hangup(sender), + Pending::Block(_, ref mut sender) => check_hangup(sender), + Pending::BlockReceipts(_, ref mut sender) => check_hangup(sender), + Pending::Account(_, ref mut sender) => check_hangup(sender), + Pending::Code(_, ref mut sender) => check_hangup(sender), + Pending::TxProof(_, ref mut sender) => check_hangup(sender), + }; - if !hangup { self.dispatch_header_by_number(ctx, req, sender) } - } - Pending::HeaderByHash(req, mut sender) => - if !check_hangup(&mut sender) { self.dispatch_header_by_hash(ctx, req, sender) }, - Pending::Block(req, mut sender) => - if !check_hangup(&mut sender) { self.dispatch_block(ctx, req, sender) }, - Pending::BlockReceipts(req, mut sender) => - if !check_hangup(&mut sender) { self.dispatch_block_receipts(ctx, req, sender) }, - Pending::Account(req, mut sender) => - if !check_hangup(&mut sender) { self.dispatch_account(ctx, req, sender) }, - Pending::Code(req, mut sender) => - if !check_hangup(&mut sender) { self.dispatch_code(ctx, req, sender) }, - Pending::TxProof(req, mut sender) => - if !check_hangup(&mut sender) { self.dispatch_transaction_proof(ctx, req, sender) } - } + if !hung_up { self.dispatch(ctx, orphaned) } } } } @@ -555,218 +401,126 @@ impl Handler for OnDemand { self.dispatch_orphaned(ctx.as_basic()); } - fn on_header_proofs(&self, ctx: &EventContext, req_id: ReqId, proofs: &[(Bytes, Vec)]) { + fn on_responses(&self, ctx: &EventContext, req_id: ReqId, responses: &[basic_request::Response]) { let peer = ctx.peer(); let req = match self.pending_requests.write().remove(&req_id) { Some(req) => req, None => return, }; + let response = match responses.get(0) { + Some(response) => response, + None => { + trace!(target: "on_demand", "Ignoring empty response for request {}", req_id); + self.dispatch(ctx.as_basic(), req); + return; + } + }; + + // handle the response appropriately for the request. + // all branches which do not return early lead to disabling of the peer + // due to misbehavior. match req { - Pending::HeaderByNumber(req, sender) => { - if let Some(&(ref header, ref proof)) = proofs.get(0) { - match req.check_response(header, proof) { - Ok((header, score)) => { + Pending::HeaderProof(req, sender) => { + if let NetworkResponse::HeaderProof(ref response) = *response { + match req.check_response(&response.proof) { + Ok((hash, score)) => { let mut cache = self.cache.lock(); - let hash = header.hash(); - cache.insert_block_header(hash, header.clone()); - cache.insert_block_hash(header.number(), hash); + cache.insert_block_hash(req.num(), hash); cache.insert_chain_score(hash, score); match sender { - ChtProofSender::Both(sender) => sender.complete((header, score)), - ChtProofSender::Header(sender) => sender.complete(header), + ChtProofSender::Both(sender) => sender.complete((hash, score)), + ChtProofSender::Hash(sender) => sender.complete(hash), ChtProofSender::ChainScore(sender) => sender.complete(score), } - return } - Err(e) => { - warn!("Error handling response for header request: {:?}", e); - ctx.disable_peer(peer); - } + Err(e) => warn!("Error handling response for header request: {:?}", e), } } - - self.dispatch_header_by_number(ctx.as_basic(), req, sender); } - _ => panic!("Only header by number request fetches header proofs; qed"), - } - } - - fn on_block_headers(&self, ctx: &EventContext, req_id: ReqId, headers: &[Bytes]) { - let peer = ctx.peer(); - let req = match self.pending_requests.write().remove(&req_id) { - Some(req) => req, - None => return, - }; - - match req { Pending::HeaderByHash(req, sender) => { - if let Some(ref header) = headers.get(0) { - match req.check_response(header) { - Ok(header) => { - self.cache.lock().insert_block_header(req.0, header.clone()); - sender.complete(header); - return - } - Err(e) => { - warn!("Error handling response for header request: {:?}", e); - ctx.disable_peer(peer); + if let NetworkResponse::Headers(ref response) = *response { + if let Some(header) = response.headers.get(0) { + match req.check_response(header) { + Ok(header) => { + self.cache.lock().insert_block_header(req.0, header.clone()); + sender.complete(header); + return + } + Err(e) => warn!("Error handling response for header request: {:?}", e), } } } - - self.dispatch_header_by_hash(ctx.as_basic(), req, sender); } - _ => panic!("Only header by hash request fetches headers; qed"), - } - } - - fn on_block_bodies(&self, ctx: &EventContext, req_id: ReqId, bodies: &[Bytes]) { - let peer = ctx.peer(); - let req = match self.pending_requests.write().remove(&req_id) { - Some(req) => req, - None => return, - }; - - match req { Pending::Block(req, sender) => { - if let Some(ref body) = bodies.get(0) { - match req.check_response(body) { + if let NetworkResponse::Body(ref response) = *response { + match req.check_response(&response.body) { Ok(block) => { - let body = encoded::Body::new(body.to_vec()); - self.cache.lock().insert_block_body(req.hash, body); + self.cache.lock().insert_block_body(req.hash, response.body.clone()); sender.complete(block); return } - Err(e) => { - warn!("Error handling response for block request: {:?}", e); - ctx.disable_peer(peer); - } + Err(e) => warn!("Error handling response for block request: {:?}", e), } } - - self.dispatch_block(ctx.as_basic(), req, sender); } - _ => panic!("Only block request fetches bodies; qed"), - } - } - - fn on_receipts(&self, ctx: &EventContext, req_id: ReqId, receipts: &[Vec]) { - let peer = ctx.peer(); - let req = match self.pending_requests.write().remove(&req_id) { - Some(req) => req, - None => return, - }; - - match req { Pending::BlockReceipts(req, sender) => { - if let Some(ref receipts) = receipts.get(0) { - match req.check_response(receipts) { + if let NetworkResponse::Receipts(ref response) = *response { + match req.check_response(&response.receipts) { Ok(receipts) => { let hash = req.0.hash(); self.cache.lock().insert_block_receipts(hash, receipts.clone()); sender.complete(receipts); return } - Err(e) => { - warn!("Error handling response for receipts request: {:?}", e); - ctx.disable_peer(peer); - } + Err(e) => warn!("Error handling response for receipts request: {:?}", e), } } - - self.dispatch_block_receipts(ctx.as_basic(), req, sender); } - _ => panic!("Only receipts request fetches receipts; qed"), - } - } - - fn on_state_proofs(&self, ctx: &EventContext, req_id: ReqId, proofs: &[Vec]) { - let peer = ctx.peer(); - let req = match self.pending_requests.write().remove(&req_id) { - Some(req) => req, - None => return, - }; - - match req { Pending::Account(req, sender) => { - if let Some(ref proof) = proofs.get(0) { - match req.check_response(proof) { - Ok(proof) => { - sender.complete(proof); + if let NetworkResponse::Account(ref response) = *response { + match req.check_response(&response.proof) { + Ok(maybe_account) => { + // TODO: validate against request outputs. + // needs engine + env info as part of request. + sender.complete(maybe_account); return } - Err(e) => { - warn!("Error handling response for state request: {:?}", e); - ctx.disable_peer(peer); - } + Err(e) => warn!("Error handling response for state request: {:?}", e), } } - - self.dispatch_account(ctx.as_basic(), req, sender); } - _ => panic!("Only account request fetches state proof; qed"), - } - } - - fn on_code(&self, ctx: &EventContext, req_id: ReqId, codes: &[Bytes]) { - let peer = ctx.peer(); - let req = match self.pending_requests.write().remove(&req_id) { - Some(req) => req, - None => return, - }; - - match req { Pending::Code(req, sender) => { - if let Some(code) = codes.get(0) { - match req.check_response(code.as_slice()) { + if let NetworkResponse::Code(ref response) = *response { + match req.check_response(response.code.as_slice()) { Ok(()) => { - sender.complete(code.clone()); + sender.complete(response.code.clone()); return } - Err(e) => { - warn!("Error handling response for code request: {:?}", e); - ctx.disable_peer(peer); - } + Err(e) => warn!("Error handling response for code request: {:?}", e), } - - self.dispatch_code(ctx.as_basic(), req, sender); } } - _ => panic!("Only code request fetches code; qed"), - } - } - - fn on_transaction_proof(&self, ctx: &EventContext, req_id: ReqId, items: &[DBValue]) { - let peer = ctx.peer(); - let req = match self.pending_requests.write().remove(&req_id) { - Some(req) => req, - None => return, - }; - - match req { Pending::TxProof(req, sender) => { - match req.check_response(items) { - ProvedExecution::Complete(executed) => { - sender.complete(Ok(executed)); - return - } - ProvedExecution::Failed(err) => { - sender.complete(Err(err)); - return - } - ProvedExecution::BadProof => { - warn!("Error handling response for transaction proof request"); - ctx.disable_peer(peer); + if let NetworkResponse::Execution(ref response) = *response { + match req.check_response(&response.items) { + ProvedExecution::Complete(executed) => { + sender.complete(Ok(executed)); + return + } + ProvedExecution::Failed(err) => { + sender.complete(Err(err)); + return + } + ProvedExecution::BadProof => warn!("Error handling response for transaction proof request"), } } - - self.dispatch_transaction_proof(ctx.as_basic(), req, sender); } - _ => panic!("Only transaction proof request dispatches transaction proof requests; qed"), } + + ctx.disable_peer(peer); } fn tick(&self, ctx: &BasicContext) { diff --git a/ethcore/light/src/on_demand/request.rs b/ethcore/light/src/on_demand/request.rs index 3a72db51d..4f028a71c 100644 --- a/ethcore/light/src/on_demand/request.rs +++ b/ethcore/light/src/on_demand/request.rs @@ -61,9 +61,9 @@ impl From> for Error { } } -/// Request for a header by number. +/// Request for header proof by number #[derive(Debug, Clone, PartialEq, Eq)] -pub struct HeaderByNumber { +pub struct HeaderProof { /// The header's number. num: u64, /// The cht number for the given block number. @@ -72,11 +72,11 @@ pub struct HeaderByNumber { cht_root: H256, } -impl HeaderByNumber { +impl HeaderProof { /// Construct a new header-by-number request. Fails if the given number is 0. /// Provide the expected CHT root to compare against. pub fn new(num: u64, cht_root: H256) -> Option { - ::cht::block_to_cht_number(num).map(|cht_num| HeaderByNumber { + ::cht::block_to_cht_number(num).map(|cht_num| HeaderProof { num: num, cht_num: cht_num, cht_root: cht_root, @@ -92,18 +92,11 @@ impl HeaderByNumber { /// Access the expected CHT root. pub fn cht_root(&self) -> H256 { self.cht_root } - /// Check a response with a header and cht proof. - pub fn check_response(&self, header: &[u8], proof: &[Bytes]) -> Result<(encoded::Header, U256), Error> { - let (expected_hash, td) = match ::cht::check_proof(proof, self.num, self.cht_root) { - Some((expected_hash, td)) => (expected_hash, td), - None => return Err(Error::BadProof), - }; - - // and compare the hash to the found header. - let found_hash = header.sha3(); - match expected_hash == found_hash { - true => Ok((encoded::Header::new(header.to_vec()), td)), - false => Err(Error::WrongHash(expected_hash, found_hash)), + /// Check a response with a CHT proof, get a hash and total difficulty back. + pub fn check_response(&self, proof: &[Bytes]) -> Result<(H256, U256), Error> { + match ::cht::check_proof(proof, self.num, self.cht_root) { + Some((expected_hash, td)) => Ok((expected_hash, td)), + None => Err(Error::BadProof), } } } @@ -114,10 +107,10 @@ pub struct HeaderByHash(pub H256); impl HeaderByHash { /// Check a response for the header. - pub fn check_response(&self, header: &[u8]) -> Result { + pub fn check_response(&self, header: &encoded::Header) -> Result { let hash = header.sha3(); match hash == self.0 { - true => Ok(encoded::Header::new(header.to_vec())), + true => Ok(header.clone()), false => Err(Error::WrongHash(self.0, hash)), } } @@ -143,16 +136,14 @@ impl Body { } /// Check a response for this block body. - pub fn check_response(&self, body: &[u8]) -> Result { - let body_view = UntrustedRlp::new(&body); - + pub fn check_response(&self, body: &encoded::Body) -> Result { // check the integrity of the the body against the header - let tx_root = ::util::triehash::ordered_trie_root(body_view.at(0)?.iter().map(|r| r.as_raw().to_vec())); + let tx_root = ::util::triehash::ordered_trie_root(body.rlp().at(0).iter().map(|r| r.as_raw().to_vec())); if tx_root != self.header.transactions_root() { return Err(Error::WrongTrieRoot(self.header.transactions_root(), tx_root)); } - let uncles_hash = body_view.at(1)?.as_raw().sha3(); + let uncles_hash = body.rlp().at(1).as_raw().sha3(); if uncles_hash != self.header.uncles_hash() { return Err(Error::WrongHash(self.header.uncles_hash(), uncles_hash)); } @@ -160,7 +151,7 @@ impl Body { // concatenate the header and the body. let mut stream = RlpStream::new_list(3); stream.append_raw(self.header.rlp().as_raw(), 1); - stream.append_raw(body, 2); + stream.append_raw(&body.rlp().as_raw(), 2); Ok(encoded::Block::new(stream.out())) } @@ -194,7 +185,7 @@ pub struct Account { impl Account { /// Check a response with an account against the stored header. - pub fn check_response(&self, proof: &[Bytes]) -> Result { + pub fn check_response(&self, proof: &[Bytes]) -> Result, Error> { let state_root = self.header.state_root(); let mut db = MemoryDB::new(); @@ -203,14 +194,14 @@ impl Account { match TrieDB::new(&db, &state_root).and_then(|t| t.get(&self.address.sha3()))? { Some(val) => { let rlp = UntrustedRlp::new(&val); - Ok(BasicAccount { + Ok(Some(BasicAccount { nonce: rlp.val_at(0)?, balance: rlp.val_at(1)?, storage_root: rlp.val_at(2)?, code_hash: rlp.val_at(3)?, - }) + })) }, - None => Err(Error::BadProof) + None => Ok(None), } } } @@ -219,8 +210,6 @@ impl Account { pub struct Code { /// Block hash, number pair. pub block_id: (H256, u64), - /// Address requested. - pub address: Address, /// Account's code hash. pub code_hash: H256, } @@ -278,11 +267,11 @@ mod tests { #[test] fn no_invalid_header_by_number() { - assert!(HeaderByNumber::new(0, Default::default()).is_none()) + assert!(HeaderProof::new(0, Default::default()).is_none()) } #[test] - fn check_header_by_number() { + fn check_header_proof() { use ::cht; let test_client = TestBlockChainClient::new(); @@ -303,11 +292,11 @@ mod tests { }; let proof = cht.prove(10_000, 0).unwrap().unwrap(); - let req = HeaderByNumber::new(10_000, cht.root()).unwrap(); + let req = HeaderProof::new(10_000, cht.root()).unwrap(); let raw_header = test_client.block_header(::ethcore::ids::BlockId::Number(10_000)).unwrap(); - assert!(req.check_response(&raw_header.into_inner(), &proof[..]).is_ok()); + assert!(req.check_response(&proof[..]).is_ok()); } #[test] @@ -334,7 +323,8 @@ mod tests { hash: header.hash(), }; - assert!(req.check_response(&*body_stream.drain()).is_ok()) + let response = encoded::Body::new(body_stream.drain()); + assert!(req.check_response(&response).is_ok()) } #[test] @@ -412,7 +402,6 @@ mod tests { let code = vec![1u8; 256]; let req = Code { block_id: (Default::default(), 2), - address: Default::default(), code_hash: ::util::Hashable::sha3(&code), }; diff --git a/ethcore/light/src/types/request/mod.rs b/ethcore/light/src/types/request/mod.rs index 1ebe1c75b..a3880da44 100644 --- a/ethcore/light/src/types/request/mod.rs +++ b/ethcore/light/src/types/request/mod.rs @@ -202,6 +202,8 @@ impl Encodable for HashOrNumber { } /// All request types, as they're sent over the network. +/// They may be incomplete, with back-references to outputs +/// of prior requests. #[derive(Debug, Clone, PartialEq, Eq)] pub enum Request { /// A request for block headers. @@ -223,7 +225,7 @@ pub enum Request { Execution(IncompleteExecutionRequest), } -/// All request types, as they're sent over the network. +/// All request types, in an answerable state. #[derive(Debug, Clone, PartialEq, Eq)] pub enum CompleteRequest { /// A request for block headers.