From 4dbbc3bc88779c094c60fb08979b1daa09365fa5 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 3 Jan 2017 16:18:46 +0100 Subject: [PATCH] beginnings of on_demand request module + verification --- ethcore/light/src/net/mod.rs | 4 +- ethcore/light/src/on_demand/mod.rs | 118 ++++++++++-------- ethcore/light/src/on_demand/request.rs | 158 +++++++++++++++++++++++++ 3 files changed, 231 insertions(+), 49 deletions(-) create mode 100644 ethcore/light/src/on_demand/request.rs diff --git a/ethcore/light/src/net/mod.rs b/ethcore/light/src/net/mod.rs index aa63aca3c..863382ecc 100644 --- a/ethcore/light/src/net/mod.rs +++ b/ethcore/light/src/net/mod.rs @@ -192,12 +192,12 @@ pub trait Handler: Send + Sync { fn on_block_headers(&self, _ctx: &EventContext, _req_id: ReqId, _headers: &[Bytes]) { } /// Called when a peer responds with block receipts. fn on_receipts(&self, _ctx: &EventContext, _req_id: ReqId, _receipts: &[Vec]) { } - /// Called when a peer responds with state proofs. Each proof is a series of trie + /// Called when a peer responds with state proofs. Each proof should be a series of trie /// nodes in ascending order by distance from the root. fn on_state_proofs(&self, _ctx: &EventContext, _req_id: ReqId, _proofs: &[Vec]) { } /// Called when a peer responds with contract code. fn on_code(&self, _ctx: &EventContext, _req_id: ReqId, _codes: &[Bytes]) { } - /// Called when a peer responds with header proofs. Each proof is a block header coupled + /// Called when a peer responds with header proofs. Each proof should be a block header coupled /// with a series of trie nodes is ascending order by distance from the root. fn on_header_proofs(&self, _ctx: &EventContext, _req_id: ReqId, _proofs: &[(Bytes, Vec)]) { } /// Called to "tick" the handler periodically. diff --git a/ethcore/light/src/on_demand/mod.rs b/ethcore/light/src/on_demand/mod.rs index f6efb95a2..d020ec8da 100644 --- a/ethcore/light/src/on_demand/mod.rs +++ b/ethcore/light/src/on_demand/mod.rs @@ -29,7 +29,9 @@ use network::PeerId; use net::{Handler, Status, Capabilities, Announcement, EventContext, BasicContext, ReqId}; use util::{Address, H256, U256, RwLock}; -use request::{self as les_request, Request as LesRequest}; +use types::les_request::{self as les_request, Request as LesRequest}; + +pub mod request; /// Basic account data. // TODO: [rob] unify with similar struct in `snapshot`. @@ -85,14 +87,13 @@ struct Peer { capabilities: Capabilities, } -// request info and where to send the result. -enum Request { - HeaderByNumber(u64, H256, Sender), // num + CHT root - HeaderByHash(H256, Sender), - Block(encoded::Header, H256, Sender), - BlockReceipts(encoded::Header, Sender>), - Account(encoded::Header, Address, Sender), - Storage(encoded::Header, Address, H256, Sender), +// Attempted request info and sender to put received value. +enum Attempted { + HeaderByNumber(request::HeaderByNumber, Sender), // num + CHT root + HeaderByHash(request::HeaderByHash, Sender), + Block(request::Block, Sender), + BlockReceipts(request::BlockReceipts, Sender>), + Account(request::Account, Sender), } /// On demand request service. See module docs for more details. @@ -103,36 +104,11 @@ pub struct OnDemand { pending_requests: RwLock>, } -impl Handler for OnDemand { - fn on_connect(&self, ctx: &EventContext, status: &Status, capabilities: &Capabilities) { - self.peers.write().insert(ctx.peer(), Peer { status: status.clone(), capabilities: capabilities.clone() }); - } - - fn on_disconnect(&self, ctx: &EventContext, unfulfilled: &[ReqId]) { - self.peers.write().remove(&ctx.peer()); - - for unfulfilled in unfulfilled { - if let Some(pending) = self.pending_requests.write().remove(unfulfilled) { - trace!(target: "on_demand", "Attempting to reassign dropped request"); - self.dispatch_request(ctx.as_basic(), pending); - } - } - } - - fn on_announcement(&self, ctx: &EventContext, announcement: &Announcement) { - let mut peers = self.peers.write(); - if let Some(ref mut peer) = peers.get_mut(&ctx.peer()) { - peer.status.update_from(&announcement); - peer.capabilities.update_from(&announcement); - } - } -} - impl OnDemand { /// Request a header by block number and CHT root hash. pub fn header_by_number(&self, ctx: &BasicContext, num: u64, cht_root: H256) -> Response { let (sender, receiver) = oneshot::channel(); - self.dispatch_request(ctx, Request::HeaderByNumber(num, cht_root, sender)); + self.dispatch_request(ctx, Pending::HeaderByNumber(num, cht_root, sender)); Response(receiver) } @@ -141,7 +117,7 @@ impl OnDemand { /// it as easily. pub fn header_by_hash(&self, ctx: &BasicContext, hash: H256) -> Response { let (sender, receiver) = oneshot::channel(); - self.dispatch_request(ctx, Request::HeaderByHash(hash, sender)); + self.dispatch_request(ctx, Pending::HeaderByHash(hash, sender)); Response(receiver) } @@ -151,7 +127,7 @@ impl OnDemand { pub fn block(&self, ctx: &BasicContext, header: encoded::Header) -> Response { let (sender, receiver) = oneshot::channel(); let hash = header.hash(); - self.dispatch_request(ctx, Request::Block(header, hash, sender)); + self.dispatch_request(ctx, Pending::Block(header, hash, sender)); Response(receiver) } @@ -159,7 +135,7 @@ impl OnDemand { /// provide the block hash to fetch receipts for, and for verification of the receipts root. pub fn block_receipts(&self, ctx: &BasicContext, header: encoded::Header) -> Response> { let (sender, receiver) = oneshot::channel(); - self.dispatch_request(ctx, Request::BlockReceipts(header, sender)); + self.dispatch_request(ctx, Pending::BlockReceipts(header, sender)); Response(receiver) } @@ -167,21 +143,23 @@ impl OnDemand { /// to verify against. pub fn account(&self, ctx: &BasicContext, header: encoded::Header, address: Address) -> Response { let (sender, receiver) = oneshot::channel(); - self.dispatch_request(ctx, Request::Account(header, address, sender)); + self.dispatch_request(ctx, Pending::Account(header, address, sender)); Response(receiver) } /// Request account storage value by block header, address, and key. pub fn storage(&self, ctx: &BasicContext, header: encoded::Header, address: Address, key: H256) -> Response { let (sender, receiver) = oneshot::channel(); - self.dispatch_request(ctx, Request::Storage(header, address, key, sender)); + self.dispatch_request(ctx, Pending::Storage(header, address, key, sender)); Response(receiver) } // dispatch a request to a suitable peer. + // + // TODO: most of this will become obsolete with a PeerSearch utility (#3987) fn dispatch_request(&self, ctx: &BasicContext, request: Request) { match request { - Request::HeaderByNumber(num, cht_hash, sender) => { + Pending::HeaderByNumber(request::HeaderByNumber(num, cht_hash), sender) => { let cht_num = ::client::cht::block_to_cht_number(num); let req = LesRequest::HeaderProofs(les_request::HeaderProofs { requests: vec![les_request::HeaderProof { @@ -200,7 +178,7 @@ impl OnDemand { trace!(target: "on_demand", "Assigning request to peer {}", id); self.pending_requests.write().insert( req_id, - Request::HeaderByNumber(num, cht_hash, sender) + Pending::HeaderByNumber(num, cht_hash, sender) ); return }, @@ -214,7 +192,7 @@ impl OnDemand { trace!(target: "on_demand", "No suitable peer for request"); sender.complete(Err(Error::NoPeersAvailable)); } - Request::HeaderByHash(hash, sender) => { + Pending::HeaderByHash(hash, sender) => { let req = LesRequest::Headers(les_request::Headers { start: hash.into(), max: 1, @@ -251,7 +229,7 @@ impl OnDemand { trace!(target: "on_demand", "No suitable peer for request"); sender.complete(Err(Error::NoPeersAvailable)); } - Request::Block(header, hash, sender) => { + Pending::Block(header, hash, sender) => { let num = header.number(); let req = LesRequest::Bodies(les_request::Bodies { block_hashes: vec![hash], @@ -279,7 +257,7 @@ impl OnDemand { trace!(target: "on_demand", "No suitable peer for request"); sender.complete(Err(Error::NoPeersAvailable)); } - Request::BlockReceipts(header, sender) => { + Pending::BlockReceipts(header, sender) => { let num = header.number(); let req = LesRequest::Receipts(les_request::Receipts { block_hashes: vec![header.hash()], @@ -307,7 +285,7 @@ impl OnDemand { trace!(target: "on_demand", "No suitable peer for request"); sender.complete(Err(Error::NoPeersAvailable)); } - Request::Account(header, address, sender) => { + Pending::Account(header, address, sender) => { let num = header.number(); let req = LesRequest::StateProofs(les_request::StateProofs { requests: vec![les_request::StateProof { @@ -340,7 +318,7 @@ impl OnDemand { trace!(target: "on_demand", "No suitable peer for request"); sender.complete(Err(Error::NoPeersAvailable)); } - Request::Storage(header, address, key, sender) => { + Pending::Storage(header, address, key, sender) => { let num = header.number(); let req = LesRequest::StateProofs(les_request::StateProofs { requests: vec![les_request::StateProof { @@ -376,3 +354,49 @@ impl OnDemand { } } } + +impl Handler for OnDemand { + fn on_connect(&self, ctx: &EventContext, status: &Status, capabilities: &Capabilities) { + self.peers.write().insert(ctx.peer(), Peer { status: status.clone(), capabilities: capabilities.clone() }); + } + + fn on_disconnect(&self, ctx: &EventContext, unfulfilled: &[ReqId]) { + self.peers.write().remove(&ctx.peer()); + + for unfulfilled in unfulfilled { + if let Some(pending) = self.pending_requests.write().remove(unfulfilled) { + trace!(target: "on_demand", "Attempting to reassign dropped request"); + self.dispatch_request(ctx.as_basic(), pending); + } + } + } + + fn on_announcement(&self, ctx: &EventContext, announcement: &Announcement) { + let mut peers = self.peers.write(); + if let Some(ref mut peer) = peers.get_mut(&ctx.peer()) { + peer.status.update_from(&announcement); + peer.capabilities.update_from(&announcement); + } + } + + fn on_header_proofs(&self, ctx: &EventContext, req_id: ReqId, proofs: &[(Bytes, Vec)]) { + let peer = ctx.peer(); + let req = match self.pending_requests.write().remove(&req_id) { + Some(req) => req, + None => return, + }; + + match req { + Request::HeaderByNumber(num, cht_root, sender) => { + let (ref header, ref proof) = match proofs.get(0) { + Some(ref x) => x, + None => { + ctx.disconnect_peer(peer); + return + } + }; + } + _ => panic!("Only header by number request fetches header proofs; qed"), + } + } +} diff --git a/ethcore/light/src/on_demand/request.rs b/ethcore/light/src/on_demand/request.rs new file mode 100644 index 000000000..b0bb8ab18 --- /dev/null +++ b/ethcore/light/src/on_demand/request.rs @@ -0,0 +1,158 @@ +use ethcore::encoded; +use ethcore::receipt::Receipt; + +use rlp::{RlpStream, Stream}; +use util::{Address, Bytes, HashDB, H256, U256}; +use util::memorydb::MemoryDB; +use util::sha3::Hashable; +use util::trie::{Trie, TrieDB, TrieError}; + +use super::Account as BasicAccount; + +/// Errors in verification. +#[derive(Debug, PartialEq)] +pub enum Error { + /// RLP decoder error. + Decoder(::rlp::DecoderError), + /// Trie lookup error (result of bad proof) + Trie(TrieError), + /// Bad inclusion proof + BadProof, + /// Wrong header number. + WrongNumber(u64, u64), + /// Wrong header hash. + WrongHash(H256, H256), + /// Wrong trie root. + WrongTrieRoot(H256, H256), +} + +impl From<::rlp::DecoderError> for Error { + fn from(err: ::rlp::DecoderError) -> Self { + Error::Decoder(err) + } +} + +impl From> for Error { + fn from(err: Box) -> Self { + Error::Trie(*err) + } +} + +/// Request for a header by number. +pub struct HeaderByNumber { + /// The header's number. + pub num: u64 + /// The root of the CHT containing this header. + pub cht_root: H256, +} + +impl HeaderByNumber { + /// Check a response with a header and cht proof. + pub fn check_response(&self, header: &[u8], proof: &[Bytes]) -> Result { + use util::trie::{Trie, TrieDB}; + use rlp::{UntrustedRlp, View}; + + // check the proof + let mut db = MemoryDB::new(); + + for node in proof { db.insert(&node[..]) } + let key = ::rlp::encode(&self.num); + + let expected_hash: H256 = match TrieDB::new(&db, &self.cht_root).and_then(|t| t.get(&*key))? { + Some(val) => ::rlp::decode(&val), + None => return Err(Error::BadProof) + }; + + // and compare the hash to the found header. + let found_hash = header.sha3(); + match expected_hash == found_hash { + true => Ok(encoded::Header::new(header.to_vec())), + false => Err(Error::WrongHash(expected_hash, found_hash)), + } + } +} + +/// Request for a header by hash. +pub struct HeaderByHash(pub H256); + +impl HeaderByHash { + /// Check a response for the header. + pub fn check_response(&self, header: &[u8]) -> Result { + let hash = header.sha3(); + match hash == self.0 { + true => Ok(encoded::Header::new(header.to_vec())), + false => Err(Error::WrongHash(self.0, hash)), + } + } +} + +/// Request for a block, with header and precomputed hash. +pub struct Body { + /// The block's header. + pub header: encoded::Header, + /// The block's hash. + pub hash: H256, +} + +impl Body { + /// Check a response for this block body. + pub fn check_response(&self, body: &[u8]) -> Result { + let body_view = UntrustedRlp::new(&body); + + // check the integrity of the the body against the header + let tx_root = ::util::triehash::ordered_trie_root(body_view.at(0)?.iter().map(|r| r.as_raw().to_vec())); + if tx_root != self.header.transactions_root() { + return Err(Error::WrongTrieRoot(self.header.transactions_root(), tx_root)); + } + + let uncles_hash = body_view.at(1)?.as_raw().sha3(); + if uncles_hash != self.header.uncles_hash() { + return Err(Error::WrongHash(self.header.uncles_hash(), uncles_hash); + } + + // concatenate the header and the body. + let mut stream = RlpStream::new_list(3); + stream.append_raw(header.rlp().as_raw(), 1); + stream.append_raw(body, 2); + + Ok(encoded::Block::new(stream.out())) + } +} + +/// Request for a block's receipts with header for verification. +pub struct BlockReceipts(pub encoded::Header); + +impl BlockReceipts { + /// Check a response with receipts against the stored header. + pub fn check_response(&self, receipts: &[Receipt]) -> Result, Error> { + let receipts_root = self.0.receipts_root(); + let found_root = ::util::triehash::ordered_trie_root(receipts.iter().map(|r| ::rlp::encode(r).to_vec())); + + match receipts_root == found_root { + true => Ok(receipts.to_vec()), + false => Err(Error::WrongTrieRoot(receipts_root, found_root)), + } + } +} + +/// Request for an account structure. +pub struct Account { + /// Header for verification. + pub header: encoded::Header, + /// Address requested. + pub address: Address, +} + +impl Account { + /// Check a response with an account against the stored header. + pub fn check_response(&self, proof: &[Bytes]) -> Result, Error> { + let state_root = header.state_root(); + + let mut db = MemoryDB::new(); + for node in proof { db.insert(&*node) } + + match TrieDB::new(&db, &state_root).and_then(|t| t.get(&address.sha3()))? { + + } + } +}