integrate cache in on-demand

This commit is contained in:
Robert Habermeier 2017-02-16 20:46:59 +01:00
parent 3b9741e9d8
commit 48cf591e66
3 changed files with 124 additions and 23 deletions

View File

@ -156,7 +156,7 @@ impl Cache {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::Cache; use super::Cache;
use time::{Duration, SteadyTime}; use time::Duration;
#[test] #[test]
fn corpus_inaccessible() { fn corpus_inaccessible() {

View File

@ -19,6 +19,7 @@
//! will take the raw data received here and extract meaningful results from it. //! will take the raw data received here and extract meaningful results from it.
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc;
use ethcore::basic_account::BasicAccount; use ethcore::basic_account::BasicAccount;
use ethcore::encoded; use ethcore::encoded;
@ -28,10 +29,11 @@ use futures::{Async, Poll, Future};
use futures::sync::oneshot::{self, Sender, Receiver}; use futures::sync::oneshot::{self, Sender, Receiver};
use network::PeerId; use network::PeerId;
use rlp::{RlpStream, Stream}; use rlp::{RlpStream, Stream};
use util::{Bytes, RwLock, U256}; use util::{Bytes, RwLock, Mutex, U256};
use util::sha3::{SHA3_NULL_RLP, SHA3_EMPTY_LIST_RLP}; use util::sha3::{SHA3_NULL_RLP, SHA3_EMPTY_LIST_RLP};
use net::{Handler, Status, Capabilities, Announcement, EventContext, BasicContext, ReqId}; use net::{Handler, Status, Capabilities, Announcement, EventContext, BasicContext, ReqId};
use cache::Cache;
use types::les_request::{self as les_request, Request as LesRequest}; use types::les_request::{self as les_request, Request as LesRequest};
pub mod request; pub mod request;
@ -42,9 +44,16 @@ struct Peer {
capabilities: Capabilities, capabilities: Capabilities,
} }
// Which portions of a CHT proof should be sent.
enum ChtProofSender {
Both(Sender<(encoded::Header, U256)>),
Header(Sender<encoded::Header>),
ChainScore(Sender<U256>),
}
// Attempted request info and sender to put received value. // Attempted request info and sender to put received value.
enum Pending { enum Pending {
HeaderByNumber(request::HeaderByNumber, Sender<(encoded::Header, U256)>), // num + CHT root HeaderByNumber(request::HeaderByNumber, ChtProofSender),
HeaderByHash(request::HeaderByHash, Sender<encoded::Header>), HeaderByHash(request::HeaderByHash, Sender<encoded::Header>),
Block(request::Body, Sender<encoded::Block>), Block(request::Body, Sender<encoded::Block>),
BlockReceipts(request::BlockReceipts, Sender<Vec<Receipt>>), BlockReceipts(request::BlockReceipts, Sender<Vec<Receipt>>),
@ -58,30 +67,77 @@ enum Pending {
pub struct OnDemand { pub struct OnDemand {
peers: RwLock<HashMap<PeerId, Peer>>, peers: RwLock<HashMap<PeerId, Peer>>,
pending_requests: RwLock<HashMap<ReqId, Pending>>, pending_requests: RwLock<HashMap<ReqId, Pending>>,
cache: Arc<Mutex<Cache>>,
orphaned_requests: RwLock<Vec<Pending>>, orphaned_requests: RwLock<Vec<Pending>>,
} }
impl Default for OnDemand { impl OnDemand {
fn default() -> Self { /// Create a new `OnDemand` service with the given cache.
pub fn new(cache: Arc<Mutex<Cache>>) -> Self {
OnDemand { OnDemand {
peers: RwLock::new(HashMap::new()), peers: RwLock::new(HashMap::new()),
pending_requests: RwLock::new(HashMap::new()), pending_requests: RwLock::new(HashMap::new()),
cache: cache,
orphaned_requests: RwLock::new(Vec::new()), orphaned_requests: RwLock::new(Vec::new()),
} }
} }
}
impl OnDemand {
/// Request a header by block number and CHT root hash. /// Request a header by block number and CHT root hash.
/// Returns the header and the total difficulty. /// Returns the header.
pub fn header_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber) -> Receiver<(encoded::Header, U256)> { pub fn header_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber) -> Receiver<encoded::Header> {
let (sender, receiver) = oneshot::channel(); let (sender, receiver) = oneshot::channel();
self.dispatch_header_by_number(ctx, req, sender); let cached = {
let mut cache = self.cache.lock();
cache.block_hash(&req.num()).and_then(|hash| cache.block_header(&hash))
};
match cached {
Some(hdr) => sender.complete(hdr),
None => self.dispatch_header_by_number(ctx, req, ChtProofSender::Header(sender)),
}
receiver
}
/// Request a canonical block's chain score.
/// Returns the chain score.
pub fn chain_score_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber) -> Receiver<U256> {
let (sender, receiver) = oneshot::channel();
let cached = {
let mut cache = self.cache.lock();
cache.block_hash(&req.num()).and_then(|hash| cache.chain_score(&hash))
};
match cached {
Some(score) => sender.complete(score),
None => self.dispatch_header_by_number(ctx, req, ChtProofSender::ChainScore(sender)),
}
receiver
}
/// Request a canonical block's chain score.
/// Returns the header and chain score.
pub fn header_and_score_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber) -> Receiver<(encoded::Header, U256)> {
let (sender, receiver) = oneshot::channel();
let cached = {
let mut cache = self.cache.lock();
let hash = cache.block_hash(&req.num());
(
hash.clone().and_then(|hash| cache.block_header(&hash)),
hash.and_then(|hash| cache.chain_score(&hash)),
)
};
match cached {
(Some(hdr), Some(score)) => sender.complete((hdr, score)),
_ => self.dispatch_header_by_number(ctx, req, ChtProofSender::Both(sender)),
}
receiver receiver
} }
// dispatch the request, completing the request if no peers available. // dispatch the request, completing the request if no peers available.
fn dispatch_header_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber, sender: Sender<(encoded::Header, U256)>) { fn dispatch_header_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber, sender: ChtProofSender) {
let num = req.num(); let num = req.num();
let cht_num = req.cht_num(); let cht_num = req.cht_num();
@ -123,7 +179,10 @@ impl OnDemand {
/// it as easily. /// it as easily.
pub fn header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash) -> Receiver<encoded::Header> { pub fn header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash) -> Receiver<encoded::Header> {
let (sender, receiver) = oneshot::channel(); let (sender, receiver) = oneshot::channel();
self.dispatch_header_by_hash(ctx, req, sender); match self.cache.lock().block_header(&req.0) {
Some(hdr) => sender.complete(hdr),
None => self.dispatch_header_by_hash(ctx, req, sender),
}
receiver receiver
} }
@ -181,7 +240,16 @@ impl OnDemand {
sender.complete(encoded::Block::new(stream.out())) sender.complete(encoded::Block::new(stream.out()))
} else { } else {
self.dispatch_block(ctx, req, sender); match self.cache.lock().block_body(&req.hash) {
Some(body) => {
let mut stream = RlpStream::new_list(3);
stream.append_raw(&req.header.into_inner(), 1);
stream.append_raw(&body.into_inner(), 2);
sender.complete(encoded::Block::new(stream.out()));
}
None => self.dispatch_block(ctx, req, sender),
}
} }
receiver receiver
} }
@ -224,7 +292,10 @@ impl OnDemand {
if req.0.receipts_root() == SHA3_NULL_RLP { if req.0.receipts_root() == SHA3_NULL_RLP {
sender.complete(Vec::new()) sender.complete(Vec::new())
} else { } else {
self.dispatch_block_receipts(ctx, req, sender); match self.cache.lock().block_receipts(&req.0.hash()) {
Some(receipts) => sender.complete(receipts),
None => self.dispatch_block_receipts(ctx, req, sender),
}
} }
receiver receiver
@ -378,8 +449,15 @@ impl OnDemand {
for orphaned in to_dispatch { for orphaned in to_dispatch {
match orphaned { match orphaned {
Pending::HeaderByNumber(req, mut sender) => Pending::HeaderByNumber(req, mut sender) => {
if !check_hangup(&mut sender) { self.dispatch_header_by_number(ctx, req, sender) }, let hangup = match sender {
ChtProofSender::Both(ref mut s) => check_hangup(s),
ChtProofSender::Header(ref mut s) => check_hangup(s),
ChtProofSender::ChainScore(ref mut s) => check_hangup(s),
};
if !hangup { self.dispatch_header_by_number(ctx, req, sender) }
}
Pending::HeaderByHash(req, mut sender) => Pending::HeaderByHash(req, mut sender) =>
if !check_hangup(&mut sender) { self.dispatch_header_by_hash(ctx, req, sender) }, if !check_hangup(&mut sender) { self.dispatch_header_by_hash(ctx, req, sender) },
Pending::Block(req, mut sender) => Pending::Block(req, mut sender) =>
@ -439,8 +517,19 @@ impl Handler for OnDemand {
Pending::HeaderByNumber(req, sender) => { Pending::HeaderByNumber(req, sender) => {
if let Some(&(ref header, ref proof)) = proofs.get(0) { if let Some(&(ref header, ref proof)) = proofs.get(0) {
match req.check_response(header, proof) { match req.check_response(header, proof) {
Ok(header) => { Ok((header, score)) => {
sender.complete(header); let mut cache = self.cache.lock();
let hash = header.hash();
cache.insert_block_header(hash, header.clone());
cache.insert_block_hash(header.number(), hash);
cache.insert_chain_score(hash, score);
match sender {
ChtProofSender::Both(sender) => sender.complete((header, score)),
ChtProofSender::Header(sender) => sender.complete(header),
ChtProofSender::ChainScore(sender) => sender.complete(score),
}
return return
} }
Err(e) => { Err(e) => {
@ -468,6 +557,7 @@ impl Handler for OnDemand {
if let Some(ref header) = headers.get(0) { if let Some(ref header) = headers.get(0) {
match req.check_response(header) { match req.check_response(header) {
Ok(header) => { Ok(header) => {
self.cache.lock().insert_block_header(req.0, header.clone());
sender.complete(header); sender.complete(header);
return return
} }
@ -493,9 +583,11 @@ impl Handler for OnDemand {
match req { match req {
Pending::Block(req, sender) => { Pending::Block(req, sender) => {
if let Some(ref block) = bodies.get(0) { if let Some(ref body) = bodies.get(0) {
match req.check_response(block) { match req.check_response(body) {
Ok(block) => { Ok(block) => {
let body = encoded::Body::new(body.to_vec());
self.cache.lock().insert_block_body(req.hash, body);
sender.complete(block); sender.complete(block);
return return
} }
@ -524,6 +616,8 @@ impl Handler for OnDemand {
if let Some(ref receipts) = receipts.get(0) { if let Some(ref receipts) = receipts.get(0) {
match req.check_response(receipts) { match req.check_response(receipts) {
Ok(receipts) => { Ok(receipts) => {
let hash = req.0.hash();
self.cache.lock().insert_block_receipts(hash, receipts.clone());
sender.complete(receipts); sender.complete(receipts);
return return
} }
@ -604,10 +698,16 @@ impl Handler for OnDemand {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use std::sync::Arc;
use cache::Cache;
use net::{Announcement, BasicContext, ReqId, Error as LesError}; use net::{Announcement, BasicContext, ReqId, Error as LesError};
use request::{Request as LesRequest, Kind as LesRequestKind}; use request::{Request as LesRequest, Kind as LesRequestKind};
use network::{PeerId, NodeId}; use network::{PeerId, NodeId};
use util::H256; use time::Duration;
use util::{H256, Mutex};
struct FakeContext; struct FakeContext;
@ -624,7 +724,8 @@ mod tests {
#[test] #[test]
fn detects_hangup() { fn detects_hangup() {
let on_demand = OnDemand::default(); let cache = Arc::new(Mutex::new(Cache::new(Default::default(), Duration::hours(6))));
let on_demand = OnDemand::new(cache);
let result = on_demand.header_by_hash(&FakeContext, request::HeaderByHash(H256::default())); let result = on_demand.header_by_hash(&FakeContext, request::HeaderByHash(H256::default()));
assert!(on_demand.orphaned_requests.read().len() == 1); assert!(on_demand.orphaned_requests.read().len() == 1);

View File

@ -108,7 +108,7 @@ impl EthClient {
self.sync.with_context(|ctx| self.sync.with_context(|ctx|
self.on_demand.header_by_number(ctx, req) self.on_demand.header_by_number(ctx, req)
.map(|(h, _)| Some(h)) .map(Some)
.map_err(err_premature_cancel) .map_err(err_premature_cancel)
.boxed() .boxed()
) )