diff --git a/ethcore/light/src/on_demand/mod.rs b/ethcore/light/src/on_demand/mod.rs index fcdebb2d4..145e66703 100644 --- a/ethcore/light/src/on_demand/mod.rs +++ b/ethcore/light/src/on_demand/mod.rs @@ -43,6 +43,8 @@ use request::{self as basic_request, Request as NetworkRequest, Response as Netw pub mod request; +pub use self::request::{CheckedRequest ,Request, Response}; + // relevant peer info. struct Peer { status: Status, @@ -50,23 +52,13 @@ struct Peer { } impl Peer { - // Whether a given peer can handle a specific request. - fn can_handle(&self, pending: &Pending) -> bool { - match *pending { - Pending::HeaderProof(ref req, _) => - self.capabilities.serve_headers && self.status.head_num > req.num(), - Pending::HeaderByHash(_, _) => self.capabilities.serve_headers, - Pending::Block(ref req, _) => - self.capabilities.serve_chain_since.as_ref().map_or(false, |x| *x <= req.header.number()), - Pending::BlockReceipts(ref req, _) => - self.capabilities.serve_chain_since.as_ref().map_or(false, |x| *x <= req.0.number()), - Pending::Account(ref req, _) => - self.capabilities.serve_state_since.as_ref().map_or(false, |x| *x <= req.header.number()), - Pending::Code(ref req, _) => - self.capabilities.serve_state_since.as_ref().map_or(false, |x| *x <= req.block_id.1), - Pending::TxProof(ref req, _) => - self.capabilities.serve_state_since.as_ref().map_or(false, |x| *x <= req.header.number()), - } + // whether this peer can fulfill the + fn can_fulfill(&self, c: &Capabilities) -> bool { + let caps = &self.capabilities; + + caps.serve_headers == c.serve_headers && + caps.serve_chain_since >= c.serve_chain_since && + caps.serve_state_since >= c.serve_chain_since } } @@ -78,262 +70,256 @@ enum ChtProofSender { } // Attempted request info and sender to put received value. -enum Pending { - HeaderProof(request::HeaderProof, ChtProofSender), - HeaderByHash(request::HeaderByHash, Sender), - Block(request::Body, Sender), - BlockReceipts(request::BlockReceipts, Sender>), - Account(request::Account, Sender), - Code(request::Code, Sender), - TxProof(request::TransactionProof, Sender>), +struct Pending { + requests: basic_request::Requests, + net_requests: basic_request::Requests, + required_capabilities: Capabilities, + responses: Vec, + sender: oneshot::Sender>, } -impl Pending { - // Create a network request. - fn make_request(&self) -> NetworkRequest { - match *self { - Pending::HeaderByHash(ref req, _) => NetworkRequest::Headers(basic_request::IncompleteHeadersRequest { - start: basic_request::HashOrNumber::Hash(req.0).into(), - skip: 0, - max: 1, - reverse: false, - }), - Pending::HeaderProof(ref req, _) => NetworkRequest::HeaderProof(basic_request::IncompleteHeaderProofRequest { - num: req.num().into(), - }), - Pending::Block(ref req, _) => NetworkRequest::Body(basic_request::IncompleteBodyRequest { - hash: req.hash.into(), - }), - Pending::BlockReceipts(ref req, _) => NetworkRequest::Receipts(basic_request::IncompleteReceiptsRequest { - hash: req.0.hash().into(), - }), - Pending::Account(ref req, _) => NetworkRequest::Account(basic_request::IncompleteAccountRequest { - block_hash: req.header.hash().into(), - address_hash: ::util::Hashable::sha3(&req.address).into(), - }), - Pending::Code(ref req, _) => NetworkRequest::Code(basic_request::IncompleteCodeRequest { - block_hash: req.block_id.0.into(), - code_hash: req.code_hash.into(), - }), - Pending::TxProof(ref req, _) => NetworkRequest::Execution(basic_request::IncompleteExecutionRequest { - block_hash: req.header.hash().into(), - from: req.tx.sender(), - gas: req.tx.gas, - gas_price: req.tx.gas_price, - action: req.tx.action.clone(), - value: req.tx.value, - data: req.tx.data.clone(), - }), +// helper to guess capabilities required for a given batch of network requests. +fn guess_capabilities(requests: &[CheckedRequest]) -> Capabilities { + let mut caps = Capabilities { + serve_headers: false, + serve_chain_since: None, + serve_state_since: None, + tx_relay: false, + }; + + let update_since = |current: &mut Option, new| + *current = match *current { + Some(x) => Some(::std::cmp::min(x, new)), + None => Some(new), + }; + + for request in requests { + match *request { + // TODO: might be worth returning a required block number for this also. + CheckedRequest::HeaderProof(_, _) => + caps.serve_headers = true, + CheckedRequest::HeaderByHash(_, _) => + caps.serve_headers = true, + CheckedRequest::Body(ref req, _) => + update_since(&mut caps.serve_chain_since, req.header.number()), + CheckedRequest::Receipts(ref req, _) => + update_since(&mut caps.serve_chain_since, req.0.number()), + CheckedRequest::Account(ref req, _) => + update_since(&mut caps.serve_state_since, req.header.number()), + CheckedRequest::Code(ref req, _) => + update_since(&mut caps.serve_state_since, req.block_id.1), + CheckedRequest::Execution(ref req, _) => + update_since(&mut caps.serve_state_since, req.header.number()), } } + + caps } /// On demand request service. See module docs for more details. /// Accumulates info about all peers' capabilities and dispatches /// requests to them accordingly. +// lock in declaration order. pub struct OnDemand { + pending: RwLock>, peers: RwLock>, - pending_requests: RwLock>, + in_transit: RwLock>, cache: Arc>, - orphaned_requests: RwLock>, - start_nonce: U256, } const RECEIVER_IN_SCOPE: &'static str = "Receiver is still in scope, so it's not dropped; qed"; impl OnDemand { /// Create a new `OnDemand` service with the given cache. - pub fn new(cache: Arc>, account_start_nonce: U256) -> Self { + pub fn new(cache: Arc>) -> Self { OnDemand { + pending: RwLock::new(Vec::new()), peers: RwLock::new(HashMap::new()), - pending_requests: RwLock::new(HashMap::new()), + in_transit: RwLock::new(HashMap::new()), cache: cache, - orphaned_requests: RwLock::new(Vec::new()), - start_nonce: account_start_nonce, } } - /// Request a header's hash by block number and CHT root hash. - /// Returns the hash. - pub fn hash_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> Receiver { - let (sender, receiver) = oneshot::channel(); - let cached = { - let mut cache = self.cache.lock(); - cache.block_hash(&req.num()) - }; + // /// Request a header's hash by block number and CHT root hash. + // /// Returns the hash. + // pub fn hash_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> Receiver { + // let (sender, receiver) = oneshot::channel(); + // let cached = { + // let mut cache = self.cache.lock(); + // cache.block_hash(&req.num()) + // }; - match cached { - Some(hash) => sender.send(hash).expect(RECEIVER_IN_SCOPE), - None => self.dispatch(ctx, Pending::HeaderProof(req, ChtProofSender::Hash(sender))), - } - receiver - } + // match cached { + // Some(hash) => sender.send(hash).expect(RECEIVER_IN_SCOPE), + // None => self.dispatch(ctx, Pending::HeaderProof(req, ChtProofSender::Hash(sender))), + // } + // receiver + // } - /// Request a canonical block's chain score. - /// Returns the chain score. - pub fn chain_score_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> Receiver { - let (sender, receiver) = oneshot::channel(); - let cached = { - let mut cache = self.cache.lock(); - cache.block_hash(&req.num()).and_then(|hash| cache.chain_score(&hash)) - }; + // /// Request a canonical block's chain score. + // /// Returns the chain score. + // pub fn chain_score_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> Receiver { + // let (sender, receiver) = oneshot::channel(); + // let cached = { + // let mut cache = self.cache.lock(); + // cache.block_hash(&req.num()).and_then(|hash| cache.chain_score(&hash)) + // }; - match cached { - Some(score) => sender.send(score).expect(RECEIVER_IN_SCOPE), - None => self.dispatch(ctx, Pending::HeaderProof(req, ChtProofSender::ChainScore(sender))), - } + // match cached { + // Some(score) => sender.send(score).expect(RECEIVER_IN_SCOPE), + // None => self.dispatch(ctx, Pending::HeaderProof(req, ChtProofSender::ChainScore(sender))), + // } - receiver - } + // receiver + // } - /// Request a canonical block's hash and chain score by number. - /// Returns the hash and chain score. - pub fn hash_and_score_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> Receiver<(H256, U256)> { - let (sender, receiver) = oneshot::channel(); - let cached = { - let mut cache = self.cache.lock(); - let hash = cache.block_hash(&req.num()); - ( - hash.clone(), - hash.and_then(|hash| cache.chain_score(&hash)), - ) - }; + // /// Request a canonical block's hash and chain score by number. + // /// Returns the hash and chain score. + // pub fn hash_and_score_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> Receiver<(H256, U256)> { + // let (sender, receiver) = oneshot::channel(); + // let cached = { + // let mut cache = self.cache.lock(); + // let hash = cache.block_hash(&req.num()); + // ( + // hash.clone(), + // hash.and_then(|hash| cache.chain_score(&hash)), + // ) + // }; - match cached { - (Some(hash), Some(score)) => sender.send((hash, score)).expect(RECEIVER_IN_SCOPE), - _ => self.dispatch(ctx, Pending::HeaderProof(req, ChtProofSender::Both(sender))), - } + // match cached { + // (Some(hash), Some(score)) => sender.send((hash, score)).expect(RECEIVER_IN_SCOPE), + // _ => self.dispatch(ctx, Pending::HeaderProof(req, ChtProofSender::Both(sender))), + // } - receiver - } + // receiver + // } - /// Request a header by hash. This is less accurate than by-number because we don't know - /// where in the chain this header lies, and therefore can't find a peer who is supposed to have - /// it as easily. - pub fn header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash) -> Receiver { - let (sender, receiver) = oneshot::channel(); - match { self.cache.lock().block_header(&req.0) } { - Some(hdr) => sender.send(hdr).expect(RECEIVER_IN_SCOPE), - None => self.dispatch(ctx, Pending::HeaderByHash(req, sender)), - } - receiver - } + // /// Request a header by hash. This is less accurate than by-number because we don't know + // /// where in the chain this header lies, and therefore can't find a peer who is supposed to have + // /// it as easily. + // pub fn header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash) -> Receiver { + // let (sender, receiver) = oneshot::channel(); + // match { self.cache.lock().block_header(&req.0) } { + // Some(hdr) => sender.send(hdr).expect(RECEIVER_IN_SCOPE), + // None => self.dispatch(ctx, Pending::HeaderByHash(req, sender)), + // } + // receiver + // } - /// Request a block, given its header. Block bodies are requestable by hash only, - /// and the header is required anyway to verify and complete the block body - /// -- this just doesn't obscure the network query. - pub fn block(&self, ctx: &BasicContext, req: request::Body) -> Receiver { + // /// Request a block, given its header. Block bodies are requestable by hash only, + // /// and the header is required anyway to verify and complete the block body + // /// -- this just doesn't obscure the network query. + // pub fn block(&self, ctx: &BasicContext, req: request::Body) -> Receiver { + // let (sender, receiver) = oneshot::channel(); + + // // fast path for empty body. + // if req.header.transactions_root() == SHA3_NULL_RLP && req.header.uncles_hash() == SHA3_EMPTY_LIST_RLP { + // let mut stream = RlpStream::new_list(3); + // stream.append_raw(&req.header.into_inner(), 1); + // stream.begin_list(0); + // stream.begin_list(0); + + // sender.send(encoded::Block::new(stream.out())).expect(RECEIVER_IN_SCOPE); + // } else { + // match { self.cache.lock().block_body(&req.hash) } { + // Some(body) => { + // let mut stream = RlpStream::new_list(3); + // let body = body.rlp(); + // stream.append_raw(&req.header.into_inner(), 1); + // stream.append_raw(&body.at(0).as_raw(), 1); + // stream.append_raw(&body.at(1).as_raw(), 1); + + // sender.send(encoded::Block::new(stream.out())).expect(RECEIVER_IN_SCOPE); + // } + // None => self.dispatch(ctx, Pending::Block(req, sender)), + // } + // } + // receiver + // } + + // /// Request the receipts for a block. The header serves two purposes: + // /// provide the block hash to fetch receipts for, and for verification of the receipts root. + // pub fn block_receipts(&self, ctx: &BasicContext, req: request::BlockReceipts) -> Receiver> { + // let (sender, receiver) = oneshot::channel(); + + // // fast path for empty receipts. + // if req.0.receipts_root() == SHA3_NULL_RLP { + // sender.send(Vec::new()).expect(RECEIVER_IN_SCOPE); + // } else { + // match { self.cache.lock().block_receipts(&req.0.hash()) } { + // Some(receipts) => sender.send(receipts).expect(RECEIVER_IN_SCOPE), + // None => self.dispatch(ctx, Pending::BlockReceipts(req, sender)), + // } + // } + + // receiver + // } + + // /// Request an account by address and block header -- which gives a hash to query and a state root + // /// to verify against. + // pub fn account(&self, ctx: &BasicContext, req: request::Account) -> Receiver { + // let (sender, receiver) = oneshot::channel(); + // self.dispatch(ctx, Pending::Account(req, sender)); + // receiver + // } + + // /// Request code by address, known code hash, and block header. + // pub fn code(&self, ctx: &BasicContext, req: request::Code) -> Receiver { + // let (sender, receiver) = oneshot::channel(); + + // // fast path for no code. + // if req.code_hash == SHA3_EMPTY { + // sender.send(Vec::new()).expect(RECEIVER_IN_SCOPE) + // } else { + // self.dispatch(ctx, Pending::Code(req, sender)); + // } + + // receiver + // } + + // /// Request proof-of-execution for a transaction. + // pub fn transaction_proof(&self, ctx: &BasicContext, req: request::TransactionProof) -> Receiver> { + // let (sender, receiver) = oneshot::channel(); + + // self.dispatch(ctx, Pending::TxProof(req, sender)); + + // receiver + // } + + /// Submit a batch of requests. + /// + /// Fails if back-references are not coherent. + /// The returned vector of responses will match the requests exactly. + pub fn make_requests(&self, ctx: &BasicContext, requests: Vec) + -> Result>, basic_request::NoSuchOutput> + { let (sender, receiver) = oneshot::channel(); - // fast path for empty body. - if req.header.transactions_root() == SHA3_NULL_RLP && req.header.uncles_hash() == SHA3_EMPTY_LIST_RLP { - let mut stream = RlpStream::new_list(3); - stream.append_raw(&req.header.into_inner(), 1); - stream.begin_list(0); - stream.begin_list(0); - - sender.send(encoded::Block::new(stream.out())).expect(RECEIVER_IN_SCOPE); - } else { - match { self.cache.lock().block_body(&req.hash) } { - Some(body) => { - let mut stream = RlpStream::new_list(3); - let body = body.rlp(); - stream.append_raw(&req.header.into_inner(), 1); - stream.append_raw(&body.at(0).as_raw(), 1); - stream.append_raw(&body.at(1).as_raw(), 1); - - sender.send(encoded::Block::new(stream.out())).expect(RECEIVER_IN_SCOPE); - } - None => self.dispatch(ctx, Pending::Block(req, sender)), - } - } - receiver - } - - /// Request the receipts for a block. The header serves two purposes: - /// provide the block hash to fetch receipts for, and for verification of the receipts root. - pub fn block_receipts(&self, ctx: &BasicContext, req: request::BlockReceipts) -> Receiver> { - let (sender, receiver) = oneshot::channel(); - - // fast path for empty receipts. - if req.0.receipts_root() == SHA3_NULL_RLP { - sender.send(Vec::new()).expect(RECEIVER_IN_SCOPE); - } else { - match { self.cache.lock().block_receipts(&req.0.hash()) } { - Some(receipts) => sender.send(receipts).expect(RECEIVER_IN_SCOPE), - None => self.dispatch(ctx, Pending::BlockReceipts(req, sender)), - } - } - - receiver - } - - /// Request an account by address and block header -- which gives a hash to query and a state root - /// to verify against. - pub fn account(&self, ctx: &BasicContext, req: request::Account) -> Receiver { - let (sender, receiver) = oneshot::channel(); - self.dispatch(ctx, Pending::Account(req, sender)); - receiver - } - - /// Request code by address, known code hash, and block header. - pub fn code(&self, ctx: &BasicContext, req: request::Code) -> Receiver { - let (sender, receiver) = oneshot::channel(); - - // fast path for no code. - if req.code_hash == SHA3_EMPTY { - sender.send(Vec::new()).expect(RECEIVER_IN_SCOPE) - } else { - self.dispatch(ctx, Pending::Code(req, sender)); - } - - receiver - } - - /// Request proof-of-execution for a transaction. - pub fn transaction_proof(&self, ctx: &BasicContext, req: request::TransactionProof) -> Receiver> { - let (sender, receiver) = oneshot::channel(); - - self.dispatch(ctx, Pending::TxProof(req, sender)); - - receiver - } - - // dispatch the request, with a "suitability" function to filter acceptable peers. - fn dispatch(&self, ctx: &BasicContext, pending: Pending) { let mut builder = basic_request::RequestBuilder::default(); - builder.push(pending.make_request()) - .expect("make_request always returns fully complete request; qed"); - let complete = builder.build(); - - let kind = complete.requests()[0].kind(); - for (id, peer) in self.peers.read().iter() { - if !peer.can_handle(&pending) { continue } - match ctx.request_from(*id, complete.clone()) { - Ok(req_id) => { - trace!(target: "on_demand", "{}: Assigned {:?} to peer {}", - req_id, kind, id); - - self.pending_requests.write().insert( - req_id, - pending, - ); - return - } - Err(net::Error::NoCredits) => {} - Err(e) => - trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), - } + let responses = Vec::with_capacity(requests.len()); + for request in requests { + builder.push(CheckedRequest::from(request))?; } - self.orphaned_requests.write().push(pending); + let requests = builder.build(); + let net_requests = requests.clone().map_requests(|req| req.into_net_request()); + let capabilities = guess_capabilities(requests.requests()); + + self.pending.write().push(Pending { + requests: requests, + net_requests: net_requests, + required_capabilities: capabilities, + responses: responses, + sender: sender, + }); + + Ok(receiver) } - - // dispatch orphaned requests, and discard those for which the corresponding + // dispatch pending requests, and discard those for which the corresponding // receiver has been dropped. - fn dispatch_orphaned(&self, ctx: &BasicContext) { + fn dispatch_pending(&self, ctx: &BasicContext) { // wrapper future for calling `poll_cancel` on our `Senders` to preserve // the invariant that it's always within a task. struct CheckHangup<'a, T: 'a>(&'a mut Sender); @@ -356,35 +342,42 @@ impl OnDemand { CheckHangup(send).wait().expect("CheckHangup always returns ok; qed") } - if self.orphaned_requests.read().is_empty() { return } + if self.pending.read().is_empty() { return } + let mut pending = self.pending.write(); - let to_dispatch = ::std::mem::replace(&mut *self.orphaned_requests.write(), Vec::new()); + // iterate over all pending requests, and check them for hang-up. + // then, try and find a peer who can serve it. + let peers = self.peers.read(); + *pending = ::std::mem::replace(&mut *pending, Vec::new()).into_iter() + .filter_map(|mut pending| match check_hangup(&mut pending.sender) { + true => Some(pending), + false => None, + }) + .filter_map(|pending| { + for (peer_id, peer) in peers.iter() { // .shuffle? + if !peer.can_fulfill(&pending.required_capabilities) { + continue + } - trace!(target: "on_demand", "Attempting to dispatch {} orphaned requests.", to_dispatch.len()); - for mut orphaned in to_dispatch { - let hung_up = match orphaned { - Pending::HeaderProof(_, ref mut sender) => match *sender { - ChtProofSender::Both(ref mut s) => check_hangup(s), - ChtProofSender::Hash(ref mut s) => check_hangup(s), - ChtProofSender::ChainScore(ref mut s) => check_hangup(s), - }, - Pending::HeaderByHash(_, ref mut sender) => check_hangup(sender), - Pending::Block(_, ref mut sender) => check_hangup(sender), - Pending::BlockReceipts(_, ref mut sender) => check_hangup(sender), - Pending::Account(_, ref mut sender) => check_hangup(sender), - Pending::Code(_, ref mut sender) => check_hangup(sender), - Pending::TxProof(_, ref mut sender) => check_hangup(sender), - }; - - if !hung_up { self.dispatch(ctx, orphaned) } - } + match ctx.request_from(*peer_id, pending.net_requests.clone()) { + Ok(req_id) => { + self.in_transit.write().insert(req_id, pending); + return None + } + Err(net::Error::NoCredits) => {} + Err(e) => debug!(target: "on_demand", "Error dispatching request to peer: {}", e), + } + } + Some(pending) + }) + .collect(); // `pending` now contains all requests we couldn't dispatch. } } impl Handler for OnDemand { fn on_connect(&self, ctx: &EventContext, status: &Status, capabilities: &Capabilities) { self.peers.write().insert(ctx.peer(), Peer { status: status.clone(), capabilities: capabilities.clone() }); - self.dispatch_orphaned(ctx.as_basic()); + self.dispatch_pending(ctx.as_basic()); } fn on_disconnect(&self, ctx: &EventContext, unfulfilled: &[ReqId]) { @@ -392,16 +385,16 @@ impl Handler for OnDemand { let ctx = ctx.as_basic(); { - let mut orphaned = self.orphaned_requests.write(); + let mut pending = self.pending.write(); for unfulfilled in unfulfilled { - if let Some(pending) = self.pending_requests.write().remove(unfulfilled) { + if let Some(unfulfilled) = self.in_transit.write().remove(unfulfilled) { trace!(target: "on_demand", "Attempting to reassign dropped request"); - orphaned.push(pending); + pending.push(unfulfilled); } } } - self.dispatch_orphaned(ctx); + self.dispatch_pending(ctx); } fn on_announcement(&self, ctx: &EventContext, announcement: &Announcement) { @@ -413,142 +406,68 @@ impl Handler for OnDemand { } } - self.dispatch_orphaned(ctx.as_basic()); + self.dispatch_pending(ctx.as_basic()); } fn on_responses(&self, ctx: &EventContext, req_id: ReqId, responses: &[basic_request::Response]) { + use request::IncompleteRequest; + let peer = ctx.peer(); - let req = match self.pending_requests.write().remove(&req_id) { + let mut pending = match self.in_transit.write().remove(&req_id) { Some(req) => req, None => return, }; - let response = match responses.get(0) { - Some(response) => response, - None => { - trace!(target: "on_demand", "Ignoring empty response for request {}", req_id); - self.dispatch(ctx.as_basic(), req); - return; - } - }; + // for each incoming response + // 1. ensure verification data filled. + // 2. pending.requests.supply_response + // 3. if extracted on-demand response + for response in responses { + match pending.requests.supply_response(response) { + Ok(response) => pending.responses.push(response), + Err(e) => { + let peer = ctx.peer(); + debug!(target: "on_demand", "Peer {} gave bad response: {:?}", peer, e); + ctx.disable_peer(peer); - trace!(target: "on_demand", "Handling response for request {}, kind={:?}", req_id, response.kind()); - - // handle the response appropriately for the request. - // all branches which do not return early lead to disabling of the peer - // due to misbehavior. - match req { - Pending::HeaderProof(req, sender) => { - if let NetworkResponse::HeaderProof(ref response) = *response { - match req.check_response(&response.proof) { - Ok((hash, score)) => { - let mut cache = self.cache.lock(); - cache.insert_block_hash(req.num(), hash); - cache.insert_chain_score(hash, score); - - match sender { - ChtProofSender::Both(sender) => { let _ = sender.send((hash, score)); } - ChtProofSender::Hash(sender) => { let _ = sender.send(hash); } - ChtProofSender::ChainScore(sender) => { let _ = sender.send(score); } - } - return - } - Err(e) => warn!(target: "on_demand", "Error handling response for header request: {:?}", e), - } - } - } - Pending::HeaderByHash(req, sender) => { - if let NetworkResponse::Headers(ref response) = *response { - match req.check_response(&response.headers) { - Ok(header) => { - self.cache.lock().insert_block_header(req.0, header.clone()); - let _ = sender.send(header); - return - } - Err(e) => warn!(target: "on_demand", "Error handling response for header request: {:?}", e), - } - } - } - Pending::Block(req, sender) => { - if let NetworkResponse::Body(ref response) = *response { - match req.check_response(&response.body) { - Ok(block) => { - self.cache.lock().insert_block_body(req.hash, response.body.clone()); - let _ = sender.send(block); - return - } - Err(e) => warn!(target: "on_demand", "Error handling response for block request: {:?}", e), - } - } - } - Pending::BlockReceipts(req, sender) => { - if let NetworkResponse::Receipts(ref response) = *response { - match req.check_response(&response.receipts) { - Ok(receipts) => { - let hash = req.0.hash(); - self.cache.lock().insert_block_receipts(hash, receipts.clone()); - let _ = sender.send(receipts); - return - } - Err(e) => warn!(target: "on_demand", "Error handling response for receipts request: {:?}", e), - } - } - } - Pending::Account(req, sender) => { - if let NetworkResponse::Account(ref response) = *response { - match req.check_response(&response.proof) { - Ok(account) => { - let account = account.unwrap_or_else(|| { - BasicAccount { - balance: 0.into(), - nonce: self.start_nonce, - code_hash: SHA3_EMPTY, - storage_root: SHA3_NULL_RLP - } - }); - - // TODO: validate against request outputs. - // needs engine + env info as part of request. - let _ = sender.send(account); - return - } - Err(e) => warn!(target: "on_demand", "Error handling response for state request: {:?}", e), - } - } - } - Pending::Code(req, sender) => { - if let NetworkResponse::Code(ref response) = *response { - match req.check_response(response.code.as_slice()) { - Ok(code) => { - let _ = sender.send(code); - return - } - Err(e) => warn!(target: "on_demand", "Error handling response for code request: {:?}", e), - } - } - } - Pending::TxProof(req, sender) => { - if let NetworkResponse::Execution(ref response) = *response { - match req.check_response(&response.items) { - ProvedExecution::Complete(executed) => { - let _ = sender.send(Ok(executed)); - return - } - ProvedExecution::Failed(err) => { - let _ = sender.send(Err(err)); - return - } - ProvedExecution::BadProof => warn!(target: "on_demand", "Error handling response for transaction proof request"), - } + break; } } } - ctx.disable_peer(peer); + if pending.requests.is_complete() { + let _ = pending.sender.send(pending.responses); + + return; + } + + // update network requests (unless we're done, in which case fulfill the future.) + let mut builder = basic_request::RequestBuilder::default(); + let num_answered = pending.requests.num_answered(); + let mut mapping = move |idx| idx - num_answered; + + for request in pending.requests.requests().iter().skip(num_answered) { + let mut net_req = request.clone().into_net_request(); + + // all back-references with request index less than `num_answered` have + // been filled by now. all remaining requests point to nothing earlier + // than the next unanswered request. + net_req.adjust_refs(&mut mapping); + builder.push(net_req) + .expect("all back-references to answered requests have been filled; qed"); + } + + // update pending fields and re-queue. + let capabilities = guess_capabilities(&pending.requests.requests()[num_answered..]); + pending.net_requests = builder.build(); + pending.required_capabilities = capabilities; + + self.pending.write().push(pending); + self.dispatch_pending(ctx.as_basic()); } fn tick(&self, ctx: &BasicContext) { - self.dispatch_orphaned(ctx) + self.dispatch_pending(ctx) } } @@ -587,7 +506,7 @@ mod tests { assert!(on_demand.orphaned_requests.read().len() == 1); drop(result); - on_demand.dispatch_orphaned(&FakeContext); + on_demand.dispatch_pending(&FakeContext); assert!(on_demand.orphaned_requests.read().is_empty()); } } diff --git a/ethcore/light/src/on_demand/request.rs b/ethcore/light/src/on_demand/request.rs index ea5214786..ad348445c 100644 --- a/ethcore/light/src/on_demand/request.rs +++ b/ethcore/light/src/on_demand/request.rs @@ -126,6 +126,23 @@ impl From for CheckedRequest { } } +impl CheckedRequest { + /// Convert this into a network request. + pub fn into_net_request(self) -> net_request::Request { + use ::request::Request as NetRequest; + + match self { + CheckedRequest::HeaderProof(_, req) => NetRequest::HeaderProof(req), + CheckedRequest::HeaderByHash(_, req) => NetRequest::Headers(req), + CheckedRequest::Receipts(_, req) => NetRequest::Receipts(req), + CheckedRequest::Body(_, req) => NetRequest::Body(req), + CheckedRequest::Account(_, req) => NetRequest::Account(req), + CheckedRequest::Code(_, req) => NetRequest::Code(req), + CheckedRequest::Execution(_, req) => NetRequest::Execution(req), + } + } +} + impl IncompleteRequest for CheckedRequest { type Complete = net_request::CompleteRequest; type Response = net_request::Response; @@ -192,6 +209,19 @@ impl IncompleteRequest for CheckedRequest { CheckedRequest::Execution(_, req) => req.complete().map(CompleteRequest::Execution), } } + + + fn adjust_refs(&mut self, mapping: F) where F: FnMut(usize) -> usize { + match *self { + CheckedRequest::HeaderProof(_, ref mut req) => req.adjust_refs(mapping), + CheckedRequest::HeaderByHash(_, ref mut req) => req.adjust_refs(mapping), + CheckedRequest::Receipts(_, ref mut req) => req.adjust_refs(mapping), + CheckedRequest::Body(_, ref mut req) => req.adjust_refs(mapping), + CheckedRequest::Account(_, ref mut req) => req.adjust_refs(mapping), + CheckedRequest::Code(_, ref mut req) => req.adjust_refs(mapping), + CheckedRequest::Execution(_, ref mut req) => req.adjust_refs(mapping), + } + } } impl net_request::CheckedRequest for CheckedRequest { diff --git a/ethcore/light/src/types/request/builder.rs b/ethcore/light/src/types/request/builder.rs index ec271f0a3..6a40d288e 100644 --- a/ethcore/light/src/types/request/builder.rs +++ b/ethcore/light/src/types/request/builder.rs @@ -87,9 +87,14 @@ impl Requests { /// Get the number of answered requests. pub fn num_answered(&self) -> usize { self.answered } + /// Whether the batch is complete. + pub fn is_complete(&self) -> bool { + self.answered == self.requests.len() + } + /// Get the next request as a filled request. Returns `None` when all requests answered. pub fn next_complete(&self) -> Option { - if self.answered == self.requests.len() { + if self.is_complete() { None } else { Some(self.requests[self.answered].clone() @@ -97,6 +102,17 @@ impl Requests { .expect("All outputs checked as invariant of `Requests` object; qed")) } } + + /// Map requests from one type into another. + pub fn map_requests(self, f: F) -> Requests + where F: FnMut(T) -> U, U: IncompleteRequest + { + Requests { + outputs: self.outputs, + requests: self.requests.into_iter().map(f).collect(), + answered: self.answered, + } + } } impl Requests { @@ -122,8 +138,8 @@ impl Requests { self.answered += 1; - // fill as much of the next request as we can. - if let Some(ref mut req) = self.requests.get_mut(self.answered) { + // fill as much of each remaining request as we can. + for req in self.requests.iter_mut().skip(self.answered) { req.fill(|req_idx, out_idx| outputs.get(&(req_idx, out_idx)).cloned().ok_or(NoSuchOutput)) } diff --git a/ethcore/light/src/types/request/mod.rs b/ethcore/light/src/types/request/mod.rs index 38a54b52c..f26908acd 100644 --- a/ethcore/light/src/types/request/mod.rs +++ b/ethcore/light/src/types/request/mod.rs @@ -100,6 +100,12 @@ impl Field { _ => Err(NoSuchOutput), } } + + fn adjust_req(&mut self, mut mapping: F) where F: FnMut(usize) -> usize { + if let Field::BackReference(ref mut req_idx, _) = *self { + *req_idx = mapping(*req_idx) + } + } } impl From for Field { @@ -358,6 +364,19 @@ impl IncompleteRequest for Request { Request::Execution(req) => req.complete().map(CompleteRequest::Execution), } } + + fn adjust_refs(&mut self, mapping: F) where F: FnMut(usize) -> usize { + match *self { + Request::Headers(ref mut req) => req.adjust_refs(mapping), + Request::HeaderProof(ref mut req) => req.adjust_refs(mapping), + Request::Receipts(ref mut req) => req.adjust_refs(mapping), + Request::Body(ref mut req) => req.adjust_refs(mapping), + Request::Account(ref mut req) => req.adjust_refs(mapping), + Request::Storage(ref mut req) => req.adjust_refs(mapping), + Request::Code(ref mut req) => req.adjust_refs(mapping), + Request::Execution(ref mut req) => req.adjust_refs(mapping), + } + } } impl CheckedRequest for Request { @@ -536,6 +555,9 @@ pub trait IncompleteRequest: Sized { /// Attempt to convert this request into its complete variant. /// Will succeed if all fields have been filled, will fail otherwise. fn complete(self) -> Result; + + /// Adjust back-reference request indices. + fn adjust_refs(&mut self, mapping: F) where F: FnMut(usize) -> usize; } /// A request which can be checked against its response for more validity. @@ -631,6 +653,10 @@ pub mod header { reverse: self.reverse, }) } + + fn adjust_refs(&mut self, mapping: F) where F: FnMut(usize) -> usize { + self.start.adjust_req(mapping) + } } /// A complete header request. @@ -745,6 +771,10 @@ pub mod header_proof { num: self.num.into_scalar()?, }) } + + fn adjust_refs(&mut self, mapping: F) where F: FnMut(usize) -> usize { + self.num.adjust_req(mapping) + } } /// A complete header proof request. @@ -849,6 +879,10 @@ pub mod block_receipts { hash: self.hash.into_scalar()?, }) } + + fn adjust_refs(&mut self, mapping: F) where F: FnMut(usize) -> usize { + self.hash.adjust_req(mapping) + } } /// A complete block receipts request. @@ -942,6 +976,10 @@ pub mod block_body { hash: self.hash.into_scalar()?, }) } + + fn adjust_refs(&mut self, mapping: F) where F: FnMut(usize) -> usize { + self.hash.adjust_req(mapping) + } } /// A complete block body request. @@ -1062,6 +1100,11 @@ pub mod account { address_hash: self.address_hash.into_scalar()?, }) } + + fn adjust_refs(&mut self, mut mapping: F) where F: FnMut(usize) -> usize { + self.block_hash.adjust_req(&mut mapping); + self.address_hash.adjust_req(&mut mapping); + } } /// A complete request for an account. @@ -1212,6 +1255,12 @@ pub mod storage { key_hash: self.key_hash.into_scalar()?, }) } + + fn adjust_refs(&mut self, mut mapping: F) where F: FnMut(usize) -> usize { + self.block_hash.adjust_req(&mut mapping); + self.address_hash.adjust_req(&mut mapping); + self.key_hash.adjust_req(&mut mapping); + } } /// A complete request for a storage proof. @@ -1332,6 +1381,11 @@ pub mod contract_code { code_hash: self.code_hash.into_scalar()?, }) } + + fn adjust_refs(&mut self, mut mapping: F) where F: FnMut(usize) -> usize { + self.block_hash.adjust_req(&mut mapping); + self.code_hash.adjust_req(&mut mapping); + } } /// A complete request. @@ -1464,6 +1518,10 @@ pub mod execution { data: self.data, }) } + + fn adjust_refs(&mut self, mapping: F) where F: FnMut(usize) -> usize { + self.block_hash.adjust_req(mapping); + } } /// A complete request.