implement requesting from

This commit is contained in:
Robert Habermeier 2017-03-08 17:37:07 +01:00
parent ee034185a5
commit bb39f104f4
6 changed files with 92 additions and 56 deletions

View File

@ -21,6 +21,7 @@ use network::{NetworkContext, PeerId, NodeId};
use super::{Announcement, LightProtocol, ReqId};
use super::error::Error;
use request::{self, Request};
use request_builder::Requests;
/// An I/O context which allows sending and receiving packets as well as
/// disconnecting peers. This is used as a generalization of the portions
@ -83,7 +84,12 @@ pub trait BasicContext {
fn persistent_peer_id(&self, peer: PeerId) -> Option<NodeId>;
/// Make a request from a peer.
fn request_from(&self, peer: PeerId, request: Request) -> Result<ReqId, Error>;
///
/// Fails on: nonexistent peer, network error, peer not server,
/// insufficient credits. Does not check capabilities before sending.
/// On success, returns a request id which can later be coordinated
/// with an event.
fn request_from(&self, peer: PeerId, request: Requests) -> Result<ReqId, Error>;
/// Make an announcement of new capabilities to the rest of the peers.
// TODO: maybe just put this on a timer in LightProtocol?
@ -119,8 +125,8 @@ impl<'a> BasicContext for TickCtx<'a> {
self.io.persistent_peer_id(id)
}
fn request_from(&self, peer: PeerId, request: Request) -> Result<ReqId, Error> {
self.proto.request_from(self.io, &peer, request)
fn request_from(&self, peer: PeerId, requests: Requests) -> Result<ReqId, Error> {
self.proto.request_from(self.io, &peer, requests)
}
fn make_announcement(&self, announcement: Announcement) {
@ -152,8 +158,8 @@ impl<'a> BasicContext for Ctx<'a> {
self.io.persistent_peer_id(id)
}
fn request_from(&self, peer: PeerId, request: Request) -> Result<ReqId, Error> {
self.proto.request_from(self.io, &peer, request)
fn request_from(&self, peer: PeerId, requests: Requests) -> Result<ReqId, Error> {
self.proto.request_from(self.io, &peer, requests)
}
fn make_announcement(&self, announcement: Announcement) {

View File

@ -35,6 +35,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use provider::Provider;
use request::{self, HashOrNumber, Request, Response};
use request_builder::Requests;
use self::request_credits::{Credits, FlowParams};
use self::context::{Ctx, TickCtx};
@ -71,8 +72,8 @@ pub const PROTOCOL_VERSIONS: &'static [u8] = &[1];
/// Max protocol version.
pub const MAX_PROTOCOL_VERSION: u8 = 1;
/// Packet count for LES.
pub const PACKET_COUNT: u8 = 17;
/// Packet count for PIP.
pub const PACKET_COUNT: u8 = 5;
// packet ID definitions.
mod packet {
@ -88,24 +89,21 @@ mod packet {
// relay transactions to peers.
pub const SEND_TRANSACTIONS: u8 = 0x04;
// request and response for transaction proof.
// TODO: merge with request/response.
pub const GET_TRANSACTION_PROOF: u8 = 0x05;
pub const TRANSACTION_PROOF: u8 = 0x06;
}
// timeouts for different kinds of requests. all values are in milliseconds.
// TODO: variable timeouts based on request count.
mod timeout {
pub const HANDSHAKE: i64 = 2500;
pub const HEADERS: i64 = 2500;
pub const BODIES: i64 = 5000;
pub const RECEIPTS: i64 = 3500;
pub const PROOFS: i64 = 4000;
pub const CONTRACT_CODES: i64 = 5000;
pub const HEADER_PROOFS: i64 = 3500;
pub const TRANSACTION_PROOF: i64 = 5000;
pub const BASE: i64 = 1500; // base timeout for packet.
// timeouts per request within packet.
pub const HEADERS: i64 = 250; // per header?
pub const BODY: i64 = 50;
pub const RECEIPT: i64 = 50;
pub const PROOF: i64 = 100; // state proof
pub const CONTRACT_CODE: i64 = 100;
pub const HEADER_PROOF: i64 = 100;
pub const TRANSACTION_PROOF: i64 = 1000; // per gas?
}
/// A request id.
@ -138,16 +136,7 @@ pub struct Peer {
failed_requests: Vec<ReqId>,
}
impl Peer {
// refund credits for a request. returns new amount of credits.
fn refund(&mut self, flow_params: &FlowParams, amount: U256) -> U256 {
flow_params.refund(&mut self.local_credits, amount);
self.local_credits.current()
}
}
/// An LES event handler.
/// A light protocol event handler.
///
/// Each handler function takes a context which describes the relevant peer
/// and gives references to the IO layer and protocol structure so new messages
@ -304,9 +293,37 @@ impl LightProtocol {
/// insufficient credits. Does not check capabilities before sending.
/// On success, returns a request id which can later be coordinated
/// with an event.
// TODO: pass `Requests`.
pub fn request_from(&self, io: &IoContext, peer_id: &PeerId, request: Request) -> Result<ReqId, Error> {
unimplemented!()
pub fn request_from(&self, io: &IoContext, peer_id: &PeerId, requests: Requests) -> Result<ReqId, Error> {
let peers = self.peers.read();
let peer = match peers.get(peer_id) {
Some(peer) => peer,
None => return Err(Error::UnknownPeer),
};
let mut peer = peer.lock();
let peer = &mut *peer;
match peer.remote_flow {
None => Err(Error::NotServer),
Some((ref mut creds, ref params)) => {
// check that enough credits are available.
let mut temp_creds: Credits = creds.clone();
for request in requests.requests() {
temp_creds.deduct_cost(params.compute_cost(request))?;
}
*creds = temp_creds;
let req_id = ReqId(self.req_id.fetch_add(1, Ordering::SeqCst));
io.send(*peer_id, packet::REQUEST, {
let mut stream = RlpStream::new_list(2);
stream.append(&req_id.0).append(&requests.requests());
stream.out()
});
// begin timeout.
peer.pending_requests.insert(req_id, requests, SteadyTime::now());
Ok(req_id)
}
}
}
/// Make an announcement of new chain head and capabilities to all peers.
@ -663,8 +680,6 @@ impl LightProtocol {
let mut peer = peer.lock();
let req_id: u64 = raw.val_at(0)?;
let mut cumulative_cost = U256::from(0);
let mut request_builder = RequestBuilder::default();
// deserialize requests, check costs and request validity.

View File

@ -25,6 +25,7 @@ use std::collections::{BTreeMap, HashMap};
use std::iter::FromIterator;
use request::{self, Request};
use request_builder::Requests;
use net::{timeout, ReqId};
use time::{Duration, SteadyTime};
@ -35,7 +36,7 @@ pub struct RequestSet {
counter: u64,
base: Option<SteadyTime>,
ids: HashMap<ReqId, u64>,
reqs: BTreeMap<u64, Request>,
reqs: BTreeMap<u64, Requests>,
}
impl Default for RequestSet {
@ -50,8 +51,8 @@ impl Default for RequestSet {
}
impl RequestSet {
/// Push a request onto the stack.
pub fn insert(&mut self, req_id: ReqId, req: Request, now: SteadyTime) {
/// Push requests onto the stack.
pub fn insert(&mut self, req_id: ReqId, req: Requests, now: SteadyTime) {
let counter = self.counter;
self.ids.insert(req_id, counter);
self.reqs.insert(counter, req);
@ -63,8 +64,8 @@ impl RequestSet {
self.counter += 1;
}
/// Remove a request from the stack.
pub fn remove(&mut self, req_id: &ReqId, now: SteadyTime) -> Option<Request> {
/// Remove a set of requests from the stack.
pub fn remove(&mut self, req_id: &ReqId, now: SteadyTime) -> Option<Requests> {
let id = match self.ids.remove(&req_id) {
Some(id) => id,
None => return None,
@ -89,7 +90,24 @@ impl RequestSet {
None => return false,
};
unimplemented!()
let first_req = self.reqs.values().next()
.expect("base existing implies `reqs` non-empty; qed");
// timeout is a base + value per request contained within.
let timeout = first_req.requests().iter().fold(timeout::BASE, |tm, req| {
tm + match *req {
Request::Headers(_) => timeout::HEADERS,
Request::HeaderProof(_) => timeout::HEADER_PROOF,
Request::Receipts(_) => timeout::RECEIPT,
Request::Body(_) => timeout::BODY,
Request::Account(_) => timeout::PROOF,
Request::Storage(_) => timeout::PROOF,
Request::Code(_) => timeout::CONTRACT_CODE,
Request::Execution(_) => timeout::TRANSACTION_PROOF,
}
});
base + Duration::milliseconds(timeout) <= now
}
/// Collect all pending request ids.

View File

@ -24,7 +24,7 @@ use ethcore::client::{BlockChainClient, ProvingBlockChainClient};
use ethcore::transaction::PendingTransaction;
use ethcore::ids::BlockId;
use ethcore::encoded;
use util::{Bytes, DBValue, RwLock, H256};
use util::{RwLock, H256};
use cht::{self, BlockInfo};
use client::{LightChainClient, AsLightClient};
@ -297,27 +297,27 @@ impl<L: AsLightClient + Send + Sync> Provider for LightProvider<L> {
self.client.as_light_client().block_header(id)
}
fn block_body(&self, req: request::CompleteBodyRequest) -> Option<request::BodyResponse> {
fn block_body(&self, _req: request::CompleteBodyRequest) -> Option<request::BodyResponse> {
None
}
fn block_receipts(&self, req: request::CompleteReceiptsRequest) -> Option<request::ReceiptsResponse> {
fn block_receipts(&self, _req: request::CompleteReceiptsRequest) -> Option<request::ReceiptsResponse> {
None
}
fn account_proof(&self, req: request::CompleteAccountRequest) -> Option<request::AccountResponse> {
fn account_proof(&self, _req: request::CompleteAccountRequest) -> Option<request::AccountResponse> {
None
}
fn storage_proof(&self, req: request::CompleteStorageRequest) -> Option<request::StorageResponse> {
fn storage_proof(&self, _req: request::CompleteStorageRequest) -> Option<request::StorageResponse> {
None
}
fn contract_code(&self, req: request::CompleteCodeRequest) -> Option<request::CodeResponse> {
fn contract_code(&self, _req: request::CompleteCodeRequest) -> Option<request::CodeResponse> {
None
}
fn header_proof(&self, req: request::CompleteHeaderProofRequest) -> Option<request::HeaderProofResponse> {
fn header_proof(&self, _req: request::CompleteHeaderProofRequest) -> Option<request::HeaderProofResponse> {
None
}

View File

@ -18,10 +18,10 @@
//! Push requests with `push`. Back-references and data required to verify responses must be
//! supplied as well.
use std::collections::{HashMap, VecDeque};
use std::collections::HashMap;
use request::{
IncompleteRequest, CompleteRequest, Request,
Field, OutputKind, Output, NoSuchOutput, Response,
OutputKind, Output, NoSuchOutput, Response,
};
/// Build chained requests. Push them onto the series with `push`,
@ -72,7 +72,7 @@ impl Requests {
/// For each request, produce responses for each.
/// The responses vector produced goes up to the point where the responder
/// first returns `None`, an invalid response, or until all requests have been responded to.
pub fn respond_to_all<F>(mut self, responder: F) -> Vec<Response>
pub fn respond_to_all<F>(self, responder: F) -> Vec<Response>
where F: Fn(CompleteRequest) -> Option<Response>
{
let mut responses = Vec::new();

View File

@ -16,11 +16,8 @@
//! Light protocol request types.
use std::collections::HashMap;
use ethcore::transaction::Action;
use rlp::{Encodable, Decodable, Decoder, DecoderError, RlpStream, Stream, View};
use util::{Address, H256, U256, Uint};
use util::H256;
// re-exports of request types.
pub use self::header::{
@ -391,7 +388,7 @@ pub enum Response {
impl Response {
/// Fill reusable outputs by writing them into the function.
pub fn fill_outputs<F>(&self, mut f: F) where F: FnMut(usize, Output) {
pub fn fill_outputs<F>(&self, f: F) where F: FnMut(usize, Output) {
match *self {
Response::Headers(ref res) => res.fill_outputs(f),
Response::HeaderProof(ref res) => res.fill_outputs(f),