support request sending

This commit is contained in:
Robert Habermeier 2016-11-18 19:12:20 +01:00
parent f1c665081a
commit 4fd9670b33
5 changed files with 223 additions and 5 deletions

View File

@ -206,6 +206,39 @@ impl FlowParams {
cost.0 + (amount * cost.1) cost.0 + (amount * cost.1)
} }
/// Compute the maximum number of costs of a specific kind which can be made
/// with the given buffer.
/// Saturates at `usize::max()`. This is not a problem in practice because
/// this amount of requests is already prohibitively large.
pub fn max_amount(&self, buffer: &Buffer, kind: request::Kind) -> usize {
use util::Uint;
use std::usize;
let cost = match kind {
request::Kind::Headers => &self.costs.headers,
request::Kind::Bodies => &self.costs.bodies,
request::Kind::Receipts => &self.costs.receipts,
request::Kind::StateProofs => &self.costs.state_proofs,
request::Kind::Codes => &self.costs.contract_codes,
request::Kind::HeaderProofs => &self.costs.header_proofs,
};
let start = buffer.current();
if start <= cost.0 {
return 0;
} else if cost.1 == U256::zero() {
return usize::MAX;
}
let max = (start - cost.0) / cost.1;
if max >= usize::MAX.into() {
usize::MAX
} else {
max.as_u64() as usize
}
}
/// Create initial buffer parameter. /// Create initial buffer parameter.
pub fn create_buffer(&self) -> Buffer { pub fn create_buffer(&self) -> Buffer {
Buffer { Buffer {

View File

@ -52,6 +52,8 @@ pub enum Error {
UnexpectedHandshake, UnexpectedHandshake,
/// Peer on wrong network (wrong NetworkId or genesis hash) /// Peer on wrong network (wrong NetworkId or genesis hash)
WrongNetwork, WrongNetwork,
/// Unknown peer.
UnknownPeer,
} }
impl Error { impl Error {
@ -64,6 +66,7 @@ impl Error {
Error::UnrecognizedPacket(_) => Punishment::Disconnect, Error::UnrecognizedPacket(_) => Punishment::Disconnect,
Error::UnexpectedHandshake => Punishment::Disconnect, Error::UnexpectedHandshake => Punishment::Disconnect,
Error::WrongNetwork => Punishment::Disable, Error::WrongNetwork => Punishment::Disable,
Error::UnknownPeer => Punishment::Disconnect,
} }
} }
} }
@ -89,6 +92,7 @@ impl fmt::Display for Error {
Error::UnrecognizedPacket(code) => write!(f, "Unrecognized packet: 0x{:x}", code), Error::UnrecognizedPacket(code) => write!(f, "Unrecognized packet: 0x{:x}", code),
Error::UnexpectedHandshake => write!(f, "Unexpected handshake"), Error::UnexpectedHandshake => write!(f, "Unexpected handshake"),
Error::WrongNetwork => write!(f, "Wrong network"), Error::WrongNetwork => write!(f, "Wrong network"),
Error::UnknownPeer => write!(f, "unknown peer"),
} }
} }
} }

View File

@ -24,9 +24,10 @@ use network::{NetworkProtocolHandler, NetworkContext, NetworkError, PeerId};
use rlp::{RlpStream, Stream, UntrustedRlp, View}; use rlp::{RlpStream, Stream, UntrustedRlp, View};
use util::hash::H256; use util::hash::H256;
use util::{Mutex, RwLock, U256}; use util::{Mutex, RwLock, U256};
use time::SteadyTime;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::sync::atomic::AtomicUsize; use std::sync::atomic::{AtomicUsize, Ordering};
use light::provider::Provider; use light::provider::Provider;
use light::request::{self, Request}; use light::request::{self, Request};
@ -39,7 +40,7 @@ mod buffer_flow;
mod error; mod error;
mod status; mod status;
pub use self::status::{Status, Capabilities, Announcement}; pub use self::status::{Status, Capabilities, Announcement, NetworkId};
const TIMEOUT: TimerToken = 0; const TIMEOUT: TimerToken = 0;
const TIMEOUT_INTERVAL_MS: u64 = 1000; const TIMEOUT_INTERVAL_MS: u64 = 1000;
@ -86,6 +87,10 @@ mod packet {
pub const HEADER_PROOFS: u8 = 0x0e; pub const HEADER_PROOFS: u8 = 0x0e;
} }
/// A request id.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ReqId(usize);
// A pending peer: one we've sent our status to but // A pending peer: one we've sent our status to but
// may not have received one for. // may not have received one for.
struct PendingPeer { struct PendingPeer {
@ -137,6 +142,24 @@ pub trait Handler: Send + Sync {
fn on_transactions(&self, _id: PeerId, _relay: &[SignedTransaction]) { } fn on_transactions(&self, _id: PeerId, _relay: &[SignedTransaction]) { }
} }
// a request and the time it was made.
struct Requested {
request: Request,
timestamp: SteadyTime,
}
/// Protocol parameters.
pub struct Params {
/// Genesis hash.
pub genesis_hash: H256,
/// Network id.
pub network_id: NetworkId,
/// Buffer flow parameters.
pub flow_params: FlowParams,
/// Initial capabilities.
pub capabilities: Capabilities,
}
/// This is an implementation of the light ethereum network protocol, abstracted /// This is an implementation of the light ethereum network protocol, abstracted
/// over a `Provider` of data and a p2p network. /// over a `Provider` of data and a p2p network.
/// ///
@ -146,10 +169,10 @@ pub trait Handler: Send + Sync {
pub struct LightProtocol { pub struct LightProtocol {
provider: Box<Provider>, provider: Box<Provider>,
genesis_hash: H256, genesis_hash: H256,
network_id: status::NetworkId, network_id: NetworkId,
pending_peers: RwLock<HashMap<PeerId, PendingPeer>>, pending_peers: RwLock<HashMap<PeerId, PendingPeer>>,
peers: RwLock<HashMap<PeerId, Peer>>, peers: RwLock<HashMap<PeerId, Peer>>,
pending_requests: RwLock<HashMap<usize, Request>>, pending_requests: RwLock<HashMap<usize, Requested>>,
capabilities: RwLock<Capabilities>, capabilities: RwLock<Capabilities>,
flow_params: FlowParams, // assumed static and same for every peer. flow_params: FlowParams, // assumed static and same for every peer.
handlers: Vec<Box<Handler>>, handlers: Vec<Box<Handler>>,
@ -157,9 +180,71 @@ pub struct LightProtocol {
} }
impl LightProtocol { impl LightProtocol {
/// Create a new instance of the protocol manager.
pub fn new(provider: Box<Provider>, params: Params) -> Self {
LightProtocol {
provider: provider,
genesis_hash: params.genesis_hash,
network_id: params.network_id,
pending_peers: RwLock::new(HashMap::new()),
peers: RwLock::new(HashMap::new()),
pending_requests: RwLock::new(HashMap::new()),
capabilities: RwLock::new(params.capabilities),
flow_params: params.flow_params,
handlers: Vec::new(),
req_id: AtomicUsize::new(0),
}
}
/// Check the maximum amount of requests of a specific type
/// which a peer would be able to serve.
pub fn max_requests(&self, peer: PeerId, kind: request::Kind) -> Option<usize> {
self.peers.write().get_mut(&peer).map(|peer| {
peer.remote_flow.recharge(&mut peer.remote_buffer);
peer.remote_flow.max_amount(&peer.remote_buffer, kind)
})
}
/// Make a request to a peer.
///
/// Fails on: nonexistent peer, network error,
/// insufficient buffer. Does not check capabilities before sending.
/// On success, returns a request id which can later be coordinated
/// with an event.
pub fn request_from(&self, io: &NetworkContext, peer_id: &PeerId, request: Request) -> Result<ReqId, Error> {
let mut peers = self.peers.write();
let peer = try!(peers.get_mut(peer_id).ok_or_else(|| Error::UnknownPeer));
peer.remote_flow.recharge(&mut peer.remote_buffer);
let max = peer.remote_flow.compute_cost(request.kind(), request.amount());
try!(peer.remote_buffer.deduct_cost(max));
let req_id = self.req_id.fetch_add(1, Ordering::SeqCst);
let packet_data = encode_request(&request, req_id);
let packet_id = match request.kind() {
request::Kind::Headers => packet::GET_BLOCK_HEADERS,
request::Kind::Bodies => packet::GET_BLOCK_BODIES,
request::Kind::Receipts => packet::GET_RECEIPTS,
request::Kind::StateProofs => packet::GET_PROOFS,
request::Kind::Codes => packet::GET_CONTRACT_CODES,
request::Kind::HeaderProofs => packet::GET_HEADER_PROOFS,
};
try!(io.send(*peer_id, packet_id, packet_data));
peer.current_asking.insert(req_id);
self.pending_requests.write().insert(req_id, Requested {
request: request,
timestamp: SteadyTime::now(),
});
Ok(ReqId(req_id))
}
/// Make an announcement of new chain head and capabilities to all peers. /// Make an announcement of new chain head and capabilities to all peers.
/// The announcement is expected to be valid. /// The announcement is expected to be valid.
pub fn make_announcement(&self, mut announcement: Announcement, io: &NetworkContext) { pub fn make_announcement(&self, io: &NetworkContext, mut announcement: Announcement) {
let mut reorgs_map = HashMap::new(); let mut reorgs_map = HashMap::new();
// update stored capabilities // update stored capabilities
@ -715,4 +800,86 @@ impl NetworkProtocolHandler for LightProtocol {
_ => warn!(target: "les", "received timeout on unknown token {}", timer), _ => warn!(target: "les", "received timeout on unknown token {}", timer),
} }
} }
}
// Helper for encoding the request to RLP with the given ID.
fn encode_request(req: &Request, req_id: usize) -> Vec<u8> {
match *req {
Request::Headers(ref headers) => {
let mut stream = RlpStream::new_list(5);
stream
.append(&req_id)
.begin_list(2)
.append(&headers.block_num)
.append(&headers.block_hash)
.append(&headers.max)
.append(&headers.skip)
.append(&headers.reverse);
stream.out()
}
Request::Bodies(ref request) => {
let mut stream = RlpStream::new_list(request.block_hashes.len() + 1);
stream.append(&req_id);
for hash in &request.block_hashes {
stream.append(hash);
}
stream.out()
}
Request::Receipts(ref request) => {
let mut stream = RlpStream::new_list(request.block_hashes.len() + 1);
stream.append(&req_id);
for hash in &request.block_hashes {
stream.append(hash);
}
stream.out()
}
Request::StateProofs(ref request) => {
let mut stream = RlpStream::new_list(request.requests.len() + 1);
stream.append(&req_id);
for proof_req in &request.requests {
stream.begin_list(4)
.append(&proof_req.block)
.append(&proof_req.key1);
match proof_req.key2 {
Some(ref key2) => stream.append(key2),
None => stream.append_empty_data(),
};
stream.append(&proof_req.from_level);
}
stream.out()
}
Request::Codes(ref request) => {
let mut stream = RlpStream::new_list(request.code_requests.len() + 1);
stream.append(&req_id);
for code_req in &request.code_requests {
stream.begin_list(2)
.append(&code_req.block_hash)
.append(&code_req.account_key);
}
stream.out()
}
Request::HeaderProofs(ref request) => {
let mut stream = RlpStream::new_list(request.requests.len() + 1);
stream.append(&req_id);
for proof_req in &request.requests {
stream.begin_list(3)
.append(&proof_req.cht_number)
.append(&proof_req.block_number)
.append(&proof_req.from_level);
}
stream.out()
}
}
} }

View File

@ -183,8 +183,10 @@ pub struct Capabilities {
/// Whether this peer can serve headers /// Whether this peer can serve headers
pub serve_headers: bool, pub serve_headers: bool,
/// Earliest block number it can serve block/receipt requests for. /// Earliest block number it can serve block/receipt requests for.
/// `None` means no requests will be servable.
pub serve_chain_since: Option<u64>, pub serve_chain_since: Option<u64>,
/// Earliest block number it can serve state requests for. /// Earliest block number it can serve state requests for.
/// `None` means no requests will be servable.
pub serve_state_since: Option<u64>, pub serve_state_since: Option<u64>,
/// Whether it can relay transactions to the eth network. /// Whether it can relay transactions to the eth network.
pub tx_relay: bool, pub tx_relay: bool,

View File

@ -152,4 +152,16 @@ impl Request {
Request::HeaderProofs(_) => Kind::HeaderProofs, Request::HeaderProofs(_) => Kind::HeaderProofs,
} }
} }
/// Get the amount of requests being made.
pub fn amount(&self) -> usize {
match *self {
Request::Headers(ref req) => req.max,
Request::Bodies(ref req) => req.block_hashes.len(),
Request::Receipts(ref req) => req.block_hashes.len(),
Request::StateProofs(ref req) => req.requests.len(),
Request::Codes(ref req) => req.code_requests.len(),
Request::HeaderProofs(ref req) => req.requests.len(),
}
}
} }