use cache in on-demand again
This commit is contained in:
@@ -27,11 +27,10 @@ use std::sync::Arc;
|
||||
use ethcore::basic_account::BasicAccount;
|
||||
use ethcore::encoded;
|
||||
use ethcore::receipt::Receipt;
|
||||
use ethcore::state::ProvedExecution;
|
||||
use ethcore::executed::{Executed, ExecutionError};
|
||||
|
||||
use futures::{Async, Poll, Future};
|
||||
use futures::sync::oneshot::{self, Sender, Receiver};
|
||||
use futures::{future, Async, Poll, Future, BoxFuture};
|
||||
use futures::sync::oneshot::{self, Sender, Receiver, Canceled};
|
||||
use network::PeerId;
|
||||
use rlp::RlpStream;
|
||||
use util::{Bytes, RwLock, Mutex, U256, H256};
|
||||
@@ -39,11 +38,14 @@ use util::sha3::{SHA3_NULL_RLP, SHA3_EMPTY, SHA3_EMPTY_LIST_RLP};
|
||||
|
||||
use net::{self, Handler, Status, Capabilities, Announcement, EventContext, BasicContext, ReqId};
|
||||
use cache::Cache;
|
||||
use request::{self as basic_request, Request as NetworkRequest, Response as NetworkResponse};
|
||||
use request::{self as basic_request, Request as NetworkRequest};
|
||||
|
||||
pub mod request;
|
||||
|
||||
pub use self::request::{CheckedRequest ,Request, Response};
|
||||
pub use self::request::{CheckedRequest, Request, Response};
|
||||
|
||||
/// The result of execution
|
||||
pub type ExecutionResult = Result<Executed, ExecutionError>;
|
||||
|
||||
// relevant peer info.
|
||||
struct Peer {
|
||||
@@ -62,13 +64,6 @@ impl Peer {
|
||||
}
|
||||
}
|
||||
|
||||
// Which portions of a CHT proof should be sent.
|
||||
enum ChtProofSender {
|
||||
Both(Sender<(H256, U256)>),
|
||||
Hash(Sender<H256>),
|
||||
ChainScore(Sender<U256>),
|
||||
}
|
||||
|
||||
// Attempted request info and sender to put received value.
|
||||
struct Pending {
|
||||
requests: basic_request::Requests<CheckedRequest>,
|
||||
@@ -127,7 +122,7 @@ pub struct OnDemand {
|
||||
cache: Arc<Mutex<Cache>>,
|
||||
}
|
||||
|
||||
const RECEIVER_IN_SCOPE: &'static str = "Receiver is still in scope, so it's not dropped; qed";
|
||||
const RESPONSES_MATCH: &'static str = "N requests always leads to N responses; qed";
|
||||
|
||||
impl OnDemand {
|
||||
/// Create a new `OnDemand` service with the given cache.
|
||||
@@ -140,151 +135,191 @@ impl OnDemand {
|
||||
}
|
||||
}
|
||||
|
||||
// /// Request a header's hash by block number and CHT root hash.
|
||||
// /// Returns the hash.
|
||||
// pub fn hash_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> Receiver<H256> {
|
||||
// let (sender, receiver) = oneshot::channel();
|
||||
// let cached = {
|
||||
// let mut cache = self.cache.lock();
|
||||
// cache.block_hash(&req.num())
|
||||
// };
|
||||
/// Request a header's hash by block number and CHT root hash.
|
||||
/// Returns the hash.
|
||||
pub fn hash_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> BoxFuture<H256, Canceled> {
|
||||
let cached = {
|
||||
let mut cache = self.cache.lock();
|
||||
cache.block_hash(&req.num())
|
||||
};
|
||||
|
||||
// match cached {
|
||||
// Some(hash) => sender.send(hash).expect(RECEIVER_IN_SCOPE),
|
||||
// None => self.dispatch(ctx, Pending::HeaderProof(req, ChtProofSender::Hash(sender))),
|
||||
// }
|
||||
// receiver
|
||||
// }
|
||||
match cached {
|
||||
Some(hash) => future::ok(hash).boxed(),
|
||||
None => {
|
||||
self.make_requests(ctx, vec![Request::HeaderProof(req)])
|
||||
.expect("request given fully fleshed out; qed")
|
||||
.map(|responses| match responses[0] {
|
||||
Response::HeaderProof(ref hash, _) => *hash,
|
||||
_ => panic!("header proof request leads to header proof response; qed")
|
||||
})
|
||||
.boxed()
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// /// Request a canonical block's chain score.
|
||||
// /// Returns the chain score.
|
||||
// pub fn chain_score_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> Receiver<U256> {
|
||||
// let (sender, receiver) = oneshot::channel();
|
||||
// let cached = {
|
||||
// let mut cache = self.cache.lock();
|
||||
// cache.block_hash(&req.num()).and_then(|hash| cache.chain_score(&hash))
|
||||
// };
|
||||
/// Request a canonical block's chain score.
|
||||
/// Returns the chain score.
|
||||
pub fn chain_score_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> BoxFuture<U256, Canceled> {
|
||||
let cached = {
|
||||
let mut cache = self.cache.lock();
|
||||
cache.block_hash(&req.num()).and_then(|hash| cache.chain_score(&hash))
|
||||
};
|
||||
|
||||
// match cached {
|
||||
// Some(score) => sender.send(score).expect(RECEIVER_IN_SCOPE),
|
||||
// None => self.dispatch(ctx, Pending::HeaderProof(req, ChtProofSender::ChainScore(sender))),
|
||||
// }
|
||||
match cached {
|
||||
Some(score) => future::ok(score).boxed(),
|
||||
None => {
|
||||
self.make_requests(ctx, vec![Request::HeaderProof(req)])
|
||||
.expect("request given fully fleshed out; qed")
|
||||
.map(|responses| match responses[0] {
|
||||
Response::HeaderProof(_, ref score) => *score,
|
||||
_ => panic!("header proof request leads to header proof response; qed")
|
||||
})
|
||||
.boxed()
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// receiver
|
||||
// }
|
||||
/// Request a canonical block's hash and chain score by number.
|
||||
/// Returns the hash and chain score.
|
||||
pub fn hash_and_score_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> BoxFuture<(H256, U256), Canceled> {
|
||||
let cached = {
|
||||
let mut cache = self.cache.lock();
|
||||
let hash = cache.block_hash(&req.num());
|
||||
(
|
||||
hash.clone(),
|
||||
hash.and_then(|hash| cache.chain_score(&hash)),
|
||||
)
|
||||
};
|
||||
|
||||
// /// Request a canonical block's hash and chain score by number.
|
||||
// /// Returns the hash and chain score.
|
||||
// pub fn hash_and_score_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> Receiver<(H256, U256)> {
|
||||
// let (sender, receiver) = oneshot::channel();
|
||||
// let cached = {
|
||||
// let mut cache = self.cache.lock();
|
||||
// let hash = cache.block_hash(&req.num());
|
||||
// (
|
||||
// hash.clone(),
|
||||
// hash.and_then(|hash| cache.chain_score(&hash)),
|
||||
// )
|
||||
// };
|
||||
match cached {
|
||||
(Some(hash), Some(score)) => future::ok((hash, score)).boxed(),
|
||||
_ => {
|
||||
self.make_requests(ctx, vec![Request::HeaderProof(req)])
|
||||
.expect("request given fully fleshed out; qed")
|
||||
.map(|responses| match responses[0] {
|
||||
Response::HeaderProof(ref hash, ref score) => (*hash, *score),
|
||||
_ => panic!("header proof request leads to header proof response; qed")
|
||||
})
|
||||
.boxed()
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// match cached {
|
||||
// (Some(hash), Some(score)) => sender.send((hash, score)).expect(RECEIVER_IN_SCOPE),
|
||||
// _ => self.dispatch(ctx, Pending::HeaderProof(req, ChtProofSender::Both(sender))),
|
||||
// }
|
||||
/// Request a header by hash. This is less accurate than by-number because we don't know
|
||||
/// where in the chain this header lies, and therefore can't find a peer who is supposed to have
|
||||
/// it as easily.
|
||||
pub fn header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash) -> BoxFuture<encoded::Header, Canceled> {
|
||||
match { self.cache.lock().block_header(&req.0) } {
|
||||
Some(hdr) => future::ok(hdr).boxed(),
|
||||
None => {
|
||||
self.make_requests(ctx, vec![Request::HeaderByHash(req)])
|
||||
.expect("request given fully fleshed out; qed")
|
||||
.map(|mut responses| match responses.pop().expect(RESPONSES_MATCH) {
|
||||
Response::HeaderByHash(header) => header,
|
||||
_ => panic!("header request leads to header response; qed")
|
||||
})
|
||||
.boxed()
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// receiver
|
||||
// }
|
||||
/// Request a block, given its header. Block bodies are requestable by hash only,
|
||||
/// and the header is required anyway to verify and complete the block body
|
||||
/// -- this just doesn't obscure the network query.
|
||||
pub fn block(&self, ctx: &BasicContext, req: request::Body) -> BoxFuture<encoded::Block, Canceled> {
|
||||
// fast path for empty body.
|
||||
if req.header.transactions_root() == SHA3_NULL_RLP && req.header.uncles_hash() == SHA3_EMPTY_LIST_RLP {
|
||||
let mut stream = RlpStream::new_list(3);
|
||||
stream.append_raw(&req.header.into_inner(), 1);
|
||||
stream.begin_list(0);
|
||||
stream.begin_list(0);
|
||||
|
||||
// /// Request a header by hash. This is less accurate than by-number because we don't know
|
||||
// /// where in the chain this header lies, and therefore can't find a peer who is supposed to have
|
||||
// /// it as easily.
|
||||
// pub fn header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash) -> Receiver<encoded::Header> {
|
||||
// let (sender, receiver) = oneshot::channel();
|
||||
// match { self.cache.lock().block_header(&req.0) } {
|
||||
// Some(hdr) => sender.send(hdr).expect(RECEIVER_IN_SCOPE),
|
||||
// None => self.dispatch(ctx, Pending::HeaderByHash(req, sender)),
|
||||
// }
|
||||
// receiver
|
||||
// }
|
||||
future::ok(encoded::Block::new(stream.out())).boxed()
|
||||
} else {
|
||||
match { self.cache.lock().block_body(&req.hash) } {
|
||||
Some(body) => {
|
||||
let mut stream = RlpStream::new_list(3);
|
||||
let body = body.rlp();
|
||||
stream.append_raw(&req.header.into_inner(), 1);
|
||||
stream.append_raw(&body.at(0).as_raw(), 1);
|
||||
stream.append_raw(&body.at(1).as_raw(), 1);
|
||||
|
||||
// /// Request a block, given its header. Block bodies are requestable by hash only,
|
||||
// /// and the header is required anyway to verify and complete the block body
|
||||
// /// -- this just doesn't obscure the network query.
|
||||
// pub fn block(&self, ctx: &BasicContext, req: request::Body) -> Receiver<encoded::Block> {
|
||||
// let (sender, receiver) = oneshot::channel();
|
||||
future::ok(encoded::Block::new(stream.out())).boxed()
|
||||
}
|
||||
None => {
|
||||
self.make_requests(ctx, vec![Request::Body(req)])
|
||||
.expect("request given fully fleshed out; qed")
|
||||
.map(|mut responses| match responses.pop().expect(RESPONSES_MATCH) {
|
||||
Response::Body(body) => body,
|
||||
_ => panic!("body request leads to body response; qed")
|
||||
})
|
||||
.boxed()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// // fast path for empty body.
|
||||
// if req.header.transactions_root() == SHA3_NULL_RLP && req.header.uncles_hash() == SHA3_EMPTY_LIST_RLP {
|
||||
// let mut stream = RlpStream::new_list(3);
|
||||
// stream.append_raw(&req.header.into_inner(), 1);
|
||||
// stream.begin_list(0);
|
||||
// stream.begin_list(0);
|
||||
/// Request the receipts for a block. The header serves two purposes:
|
||||
/// provide the block hash to fetch receipts for, and for verification of the receipts root.
|
||||
pub fn block_receipts(&self, ctx: &BasicContext, req: request::BlockReceipts) -> BoxFuture<Vec<Receipt>, Canceled> {
|
||||
// fast path for empty receipts.
|
||||
if req.0.receipts_root() == SHA3_NULL_RLP {
|
||||
return future::ok(Vec::new()).boxed()
|
||||
}
|
||||
|
||||
// sender.send(encoded::Block::new(stream.out())).expect(RECEIVER_IN_SCOPE);
|
||||
// } else {
|
||||
// match { self.cache.lock().block_body(&req.hash) } {
|
||||
// Some(body) => {
|
||||
// let mut stream = RlpStream::new_list(3);
|
||||
// let body = body.rlp();
|
||||
// stream.append_raw(&req.header.into_inner(), 1);
|
||||
// stream.append_raw(&body.at(0).as_raw(), 1);
|
||||
// stream.append_raw(&body.at(1).as_raw(), 1);
|
||||
match { self.cache.lock().block_receipts(&req.0.hash()) } {
|
||||
Some(receipts) => future::ok(receipts).boxed(),
|
||||
None => {
|
||||
self.make_requests(ctx, vec![Request::Receipts(req)])
|
||||
.expect("request given fully fleshed out; qed")
|
||||
.map(|mut responses| match responses.pop().expect(RESPONSES_MATCH) {
|
||||
Response::Receipts(receipts) => receipts,
|
||||
_ => panic!("receipts request leads to receipts response; qed")
|
||||
})
|
||||
.boxed()
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// sender.send(encoded::Block::new(stream.out())).expect(RECEIVER_IN_SCOPE);
|
||||
// }
|
||||
// None => self.dispatch(ctx, Pending::Block(req, sender)),
|
||||
// }
|
||||
// }
|
||||
// receiver
|
||||
// }
|
||||
/// Request an account by address and block header -- which gives a hash to query and a state root
|
||||
/// to verify against.
|
||||
/// `None` here means that no account by the queried key exists in the queried state.
|
||||
pub fn account(&self, ctx: &BasicContext, req: request::Account) -> BoxFuture<Option<BasicAccount>, Canceled> {
|
||||
self.make_requests(ctx, vec![Request::Account(req)])
|
||||
.expect("request given fully fleshed out; qed")
|
||||
.map(|mut responses| match responses.pop().expect(RESPONSES_MATCH) {
|
||||
Response::Account(account) => account,
|
||||
_ => panic!("account request leads to account response; qed")
|
||||
})
|
||||
.boxed()
|
||||
}
|
||||
|
||||
// /// Request the receipts for a block. The header serves two purposes:
|
||||
// /// provide the block hash to fetch receipts for, and for verification of the receipts root.
|
||||
// pub fn block_receipts(&self, ctx: &BasicContext, req: request::BlockReceipts) -> Receiver<Vec<Receipt>> {
|
||||
// let (sender, receiver) = oneshot::channel();
|
||||
/// Request code by address, known code hash, and block header.
|
||||
pub fn code(&self, ctx: &BasicContext, req: request::Code) -> BoxFuture<Bytes, Canceled> {
|
||||
// fast path for no code.
|
||||
if req.code_hash == SHA3_EMPTY {
|
||||
future::ok(Vec::new()).boxed()
|
||||
} else {
|
||||
self.make_requests(ctx, vec![Request::Code(req)])
|
||||
.expect("request given fully fleshed out; qed")
|
||||
.map(|mut responses| match responses.pop().expect(RESPONSES_MATCH) {
|
||||
Response::Code(code) => code,
|
||||
_ => panic!("code request leads to code response; qed")
|
||||
})
|
||||
.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
// // fast path for empty receipts.
|
||||
// if req.0.receipts_root() == SHA3_NULL_RLP {
|
||||
// sender.send(Vec::new()).expect(RECEIVER_IN_SCOPE);
|
||||
// } else {
|
||||
// match { self.cache.lock().block_receipts(&req.0.hash()) } {
|
||||
// Some(receipts) => sender.send(receipts).expect(RECEIVER_IN_SCOPE),
|
||||
// None => self.dispatch(ctx, Pending::BlockReceipts(req, sender)),
|
||||
// }
|
||||
// }
|
||||
|
||||
// receiver
|
||||
// }
|
||||
|
||||
// /// Request an account by address and block header -- which gives a hash to query and a state root
|
||||
// /// to verify against.
|
||||
// pub fn account(&self, ctx: &BasicContext, req: request::Account) -> Receiver<BasicAccount> {
|
||||
// let (sender, receiver) = oneshot::channel();
|
||||
// self.dispatch(ctx, Pending::Account(req, sender));
|
||||
// receiver
|
||||
// }
|
||||
|
||||
// /// Request code by address, known code hash, and block header.
|
||||
// pub fn code(&self, ctx: &BasicContext, req: request::Code) -> Receiver<Bytes> {
|
||||
// let (sender, receiver) = oneshot::channel();
|
||||
|
||||
// // fast path for no code.
|
||||
// if req.code_hash == SHA3_EMPTY {
|
||||
// sender.send(Vec::new()).expect(RECEIVER_IN_SCOPE)
|
||||
// } else {
|
||||
// self.dispatch(ctx, Pending::Code(req, sender));
|
||||
// }
|
||||
|
||||
// receiver
|
||||
// }
|
||||
|
||||
// /// Request proof-of-execution for a transaction.
|
||||
// pub fn transaction_proof(&self, ctx: &BasicContext, req: request::TransactionProof) -> Receiver<Result<Executed, ExecutionError>> {
|
||||
// let (sender, receiver) = oneshot::channel();
|
||||
|
||||
// self.dispatch(ctx, Pending::TxProof(req, sender));
|
||||
|
||||
// receiver
|
||||
// }
|
||||
/// Request proof-of-execution for a transaction.
|
||||
pub fn transaction_proof(&self, ctx: &BasicContext, req: request::TransactionProof) -> BoxFuture<ExecutionResult, Canceled> {
|
||||
self.make_requests(ctx, vec![Request::Execution(req)])
|
||||
.expect("request given fully fleshed out; qed")
|
||||
.map(|mut responses| match responses.pop().expect(RESPONSES_MATCH) {
|
||||
Response::Execution(execution) => execution,
|
||||
_ => panic!("execution request leads to execution response; qed")
|
||||
})
|
||||
.boxed()
|
||||
}
|
||||
|
||||
/// Submit a batch of requests.
|
||||
///
|
||||
@@ -295,6 +330,11 @@ impl OnDemand {
|
||||
{
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
|
||||
if requests.is_empty() {
|
||||
assert!(sender.send(Vec::new()).is_ok(), "receiver still in scope; qed");
|
||||
return Ok(receiver);
|
||||
}
|
||||
|
||||
let mut builder = basic_request::RequestBuilder::default();
|
||||
|
||||
let responses = Vec::with_capacity(requests.len());
|
||||
@@ -314,6 +354,8 @@ impl OnDemand {
|
||||
sender: sender,
|
||||
});
|
||||
|
||||
self.dispatch_pending(ctx);
|
||||
|
||||
Ok(receiver)
|
||||
}
|
||||
|
||||
@@ -350,11 +392,13 @@ impl OnDemand {
|
||||
let peers = self.peers.read();
|
||||
*pending = ::std::mem::replace(&mut *pending, Vec::new()).into_iter()
|
||||
.filter_map(|mut pending| match check_hangup(&mut pending.sender) {
|
||||
true => Some(pending),
|
||||
false => None,
|
||||
false => Some(pending),
|
||||
true => None,
|
||||
})
|
||||
.filter_map(|pending| {
|
||||
for (peer_id, peer) in peers.iter() { // .shuffle?
|
||||
// TODO: see which requests can be answered by the cache?
|
||||
|
||||
if !peer.can_fulfill(&pending.required_capabilities) {
|
||||
continue
|
||||
}
|
||||
@@ -412,19 +456,20 @@ impl Handler for OnDemand {
|
||||
fn on_responses(&self, ctx: &EventContext, req_id: ReqId, responses: &[basic_request::Response]) {
|
||||
use request::IncompleteRequest;
|
||||
|
||||
let peer = ctx.peer();
|
||||
let mut pending = match self.in_transit.write().remove(&req_id) {
|
||||
Some(req) => req,
|
||||
None => return,
|
||||
};
|
||||
|
||||
// for each incoming response
|
||||
// 1. ensure verification data filled.
|
||||
// 1. ensure verification data filled. (still TODO since on_demand doesn't use back-references yet)
|
||||
// 2. pending.requests.supply_response
|
||||
// 3. if extracted on-demand response
|
||||
for response in responses {
|
||||
match pending.requests.supply_response(response) {
|
||||
Ok(response) => pending.responses.push(response),
|
||||
match pending.requests.supply_response(&*self.cache, response) {
|
||||
Ok(response) => {
|
||||
pending.responses.push(response)
|
||||
}
|
||||
Err(e) => {
|
||||
let peer = ctx.peer();
|
||||
debug!(target: "on_demand", "Peer {} gave bad response: {:?}", peer, e);
|
||||
@@ -500,13 +545,13 @@ mod tests {
|
||||
#[test]
|
||||
fn detects_hangup() {
|
||||
let cache = Arc::new(Mutex::new(Cache::new(Default::default(), Duration::hours(6))));
|
||||
let on_demand = OnDemand::new(cache, 0.into());
|
||||
let on_demand = OnDemand::new(cache);
|
||||
let result = on_demand.header_by_hash(&FakeContext, request::HeaderByHash(H256::default()));
|
||||
|
||||
assert!(on_demand.orphaned_requests.read().len() == 1);
|
||||
assert!(on_demand.pending.read().len() == 1);
|
||||
drop(result);
|
||||
|
||||
on_demand.dispatch_pending(&FakeContext);
|
||||
assert!(on_demand.orphaned_requests.read().is_empty());
|
||||
assert!(on_demand.pending.read().is_empty());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user