diff --git a/ethcore/light/src/cache.rs b/ethcore/light/src/cache.rs index a64c9076e..defa247ec 100644 --- a/ethcore/light/src/cache.rs +++ b/ethcore/light/src/cache.rs @@ -156,7 +156,7 @@ impl Cache { #[cfg(test)] mod tests { use super::Cache; - use time::{Duration, SteadyTime}; + use time::Duration; #[test] fn corpus_inaccessible() { diff --git a/ethcore/light/src/on_demand/mod.rs b/ethcore/light/src/on_demand/mod.rs index c34e2d922..ec3b758ce 100644 --- a/ethcore/light/src/on_demand/mod.rs +++ b/ethcore/light/src/on_demand/mod.rs @@ -19,6 +19,7 @@ //! will take the raw data received here and extract meaningful results from it. use std::collections::HashMap; +use std::sync::Arc; use ethcore::basic_account::BasicAccount; use ethcore::encoded; @@ -28,10 +29,11 @@ use futures::{Async, Poll, Future}; use futures::sync::oneshot::{self, Sender, Receiver}; use network::PeerId; use rlp::{RlpStream, Stream}; -use util::{Bytes, RwLock, U256}; +use util::{Bytes, RwLock, Mutex, U256}; use util::sha3::{SHA3_NULL_RLP, SHA3_EMPTY_LIST_RLP}; use net::{Handler, Status, Capabilities, Announcement, EventContext, BasicContext, ReqId}; +use cache::Cache; use types::les_request::{self as les_request, Request as LesRequest}; pub mod request; @@ -42,9 +44,16 @@ struct Peer { capabilities: Capabilities, } +// Which portions of a CHT proof should be sent. +enum ChtProofSender { + Both(Sender<(encoded::Header, U256)>), + Header(Sender), + ChainScore(Sender), +} + // Attempted request info and sender to put received value. enum Pending { - HeaderByNumber(request::HeaderByNumber, Sender<(encoded::Header, U256)>), // num + CHT root + HeaderByNumber(request::HeaderByNumber, ChtProofSender), HeaderByHash(request::HeaderByHash, Sender), Block(request::Body, Sender), BlockReceipts(request::BlockReceipts, Sender>), @@ -58,30 +67,77 @@ enum Pending { pub struct OnDemand { peers: RwLock>, pending_requests: RwLock>, + cache: Arc>, orphaned_requests: RwLock>, } -impl Default for OnDemand { - fn default() -> Self { +impl OnDemand { + /// Create a new `OnDemand` service with the given cache. + pub fn new(cache: Arc>) -> Self { OnDemand { peers: RwLock::new(HashMap::new()), pending_requests: RwLock::new(HashMap::new()), + cache: cache, orphaned_requests: RwLock::new(Vec::new()), } } -} -impl OnDemand { /// Request a header by block number and CHT root hash. - /// Returns the header and the total difficulty. - pub fn header_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber) -> Receiver<(encoded::Header, U256)> { + /// Returns the header. + pub fn header_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber) -> Receiver { let (sender, receiver) = oneshot::channel(); - self.dispatch_header_by_number(ctx, req, sender); + let cached = { + let mut cache = self.cache.lock(); + cache.block_hash(&req.num()).and_then(|hash| cache.block_header(&hash)) + }; + + match cached { + Some(hdr) => sender.complete(hdr), + None => self.dispatch_header_by_number(ctx, req, ChtProofSender::Header(sender)), + } + receiver + } + + /// Request a canonical block's chain score. + /// Returns the chain score. + pub fn chain_score_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber) -> Receiver { + let (sender, receiver) = oneshot::channel(); + let cached = { + let mut cache = self.cache.lock(); + cache.block_hash(&req.num()).and_then(|hash| cache.chain_score(&hash)) + }; + + match cached { + Some(score) => sender.complete(score), + None => self.dispatch_header_by_number(ctx, req, ChtProofSender::ChainScore(sender)), + } + + receiver + } + + /// Request a canonical block's chain score. + /// Returns the header and chain score. + pub fn header_and_score_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber) -> Receiver<(encoded::Header, U256)> { + let (sender, receiver) = oneshot::channel(); + let cached = { + let mut cache = self.cache.lock(); + let hash = cache.block_hash(&req.num()); + ( + hash.clone().and_then(|hash| cache.block_header(&hash)), + hash.and_then(|hash| cache.chain_score(&hash)), + ) + }; + + match cached { + (Some(hdr), Some(score)) => sender.complete((hdr, score)), + _ => self.dispatch_header_by_number(ctx, req, ChtProofSender::Both(sender)), + } + receiver } // dispatch the request, completing the request if no peers available. - fn dispatch_header_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber, sender: Sender<(encoded::Header, U256)>) { + fn dispatch_header_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber, sender: ChtProofSender) { let num = req.num(); let cht_num = req.cht_num(); @@ -123,7 +179,10 @@ impl OnDemand { /// it as easily. pub fn header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash) -> Receiver { let (sender, receiver) = oneshot::channel(); - self.dispatch_header_by_hash(ctx, req, sender); + match self.cache.lock().block_header(&req.0) { + Some(hdr) => sender.complete(hdr), + None => self.dispatch_header_by_hash(ctx, req, sender), + } receiver } @@ -181,7 +240,16 @@ impl OnDemand { sender.complete(encoded::Block::new(stream.out())) } else { - self.dispatch_block(ctx, req, sender); + match self.cache.lock().block_body(&req.hash) { + Some(body) => { + let mut stream = RlpStream::new_list(3); + stream.append_raw(&req.header.into_inner(), 1); + stream.append_raw(&body.into_inner(), 2); + + sender.complete(encoded::Block::new(stream.out())); + } + None => self.dispatch_block(ctx, req, sender), + } } receiver } @@ -224,7 +292,10 @@ impl OnDemand { if req.0.receipts_root() == SHA3_NULL_RLP { sender.complete(Vec::new()) } else { - self.dispatch_block_receipts(ctx, req, sender); + match self.cache.lock().block_receipts(&req.0.hash()) { + Some(receipts) => sender.complete(receipts), + None => self.dispatch_block_receipts(ctx, req, sender), + } } receiver @@ -378,8 +449,15 @@ impl OnDemand { for orphaned in to_dispatch { match orphaned { - Pending::HeaderByNumber(req, mut sender) => - if !check_hangup(&mut sender) { self.dispatch_header_by_number(ctx, req, sender) }, + Pending::HeaderByNumber(req, mut sender) => { + let hangup = match sender { + ChtProofSender::Both(ref mut s) => check_hangup(s), + ChtProofSender::Header(ref mut s) => check_hangup(s), + ChtProofSender::ChainScore(ref mut s) => check_hangup(s), + }; + + if !hangup { self.dispatch_header_by_number(ctx, req, sender) } + } Pending::HeaderByHash(req, mut sender) => if !check_hangup(&mut sender) { self.dispatch_header_by_hash(ctx, req, sender) }, Pending::Block(req, mut sender) => @@ -439,8 +517,19 @@ impl Handler for OnDemand { Pending::HeaderByNumber(req, sender) => { if let Some(&(ref header, ref proof)) = proofs.get(0) { match req.check_response(header, proof) { - Ok(header) => { - sender.complete(header); + Ok((header, score)) => { + let mut cache = self.cache.lock(); + let hash = header.hash(); + cache.insert_block_header(hash, header.clone()); + cache.insert_block_hash(header.number(), hash); + cache.insert_chain_score(hash, score); + + match sender { + ChtProofSender::Both(sender) => sender.complete((header, score)), + ChtProofSender::Header(sender) => sender.complete(header), + ChtProofSender::ChainScore(sender) => sender.complete(score), + } + return } Err(e) => { @@ -468,6 +557,7 @@ impl Handler for OnDemand { if let Some(ref header) = headers.get(0) { match req.check_response(header) { Ok(header) => { + self.cache.lock().insert_block_header(req.0, header.clone()); sender.complete(header); return } @@ -493,9 +583,11 @@ impl Handler for OnDemand { match req { Pending::Block(req, sender) => { - if let Some(ref block) = bodies.get(0) { - match req.check_response(block) { + if let Some(ref body) = bodies.get(0) { + match req.check_response(body) { Ok(block) => { + let body = encoded::Body::new(body.to_vec()); + self.cache.lock().insert_block_body(req.hash, body); sender.complete(block); return } @@ -524,6 +616,8 @@ impl Handler for OnDemand { if let Some(ref receipts) = receipts.get(0) { match req.check_response(receipts) { Ok(receipts) => { + let hash = req.0.hash(); + self.cache.lock().insert_block_receipts(hash, receipts.clone()); sender.complete(receipts); return } @@ -604,10 +698,16 @@ impl Handler for OnDemand { #[cfg(test)] mod tests { use super::*; + + use std::sync::Arc; + + use cache::Cache; use net::{Announcement, BasicContext, ReqId, Error as LesError}; use request::{Request as LesRequest, Kind as LesRequestKind}; + use network::{PeerId, NodeId}; - use util::H256; + use time::Duration; + use util::{H256, Mutex}; struct FakeContext; @@ -624,7 +724,8 @@ mod tests { #[test] fn detects_hangup() { - let on_demand = OnDemand::default(); + let cache = Arc::new(Mutex::new(Cache::new(Default::default(), Duration::hours(6)))); + let on_demand = OnDemand::new(cache); let result = on_demand.header_by_hash(&FakeContext, request::HeaderByHash(H256::default())); assert!(on_demand.orphaned_requests.read().len() == 1); diff --git a/rpc/src/v1/impls/light/eth.rs b/rpc/src/v1/impls/light/eth.rs index 944b419f7..b739959c5 100644 --- a/rpc/src/v1/impls/light/eth.rs +++ b/rpc/src/v1/impls/light/eth.rs @@ -108,7 +108,7 @@ impl EthClient { self.sync.with_context(|ctx| self.on_demand.header_by_number(ctx, req) - .map(|(h, _)| Some(h)) + .map(Some) .map_err(err_premature_cancel) .boxed() )