diff --git a/ethcore/src/light/net/buffer_flow.rs b/ethcore/src/light/net/buffer_flow.rs index 8d6835db3..7f653f826 100644 --- a/ethcore/src/light/net/buffer_flow.rs +++ b/ethcore/src/light/net/buffer_flow.rs @@ -206,6 +206,39 @@ impl FlowParams { cost.0 + (amount * cost.1) } + /// Compute the maximum number of costs of a specific kind which can be made + /// with the given buffer. + /// Saturates at `usize::max()`. This is not a problem in practice because + /// this amount of requests is already prohibitively large. + pub fn max_amount(&self, buffer: &Buffer, kind: request::Kind) -> usize { + use util::Uint; + use std::usize; + + let cost = match kind { + request::Kind::Headers => &self.costs.headers, + request::Kind::Bodies => &self.costs.bodies, + request::Kind::Receipts => &self.costs.receipts, + request::Kind::StateProofs => &self.costs.state_proofs, + request::Kind::Codes => &self.costs.contract_codes, + request::Kind::HeaderProofs => &self.costs.header_proofs, + }; + + let start = buffer.current(); + + if start <= cost.0 { + return 0; + } else if cost.1 == U256::zero() { + return usize::MAX; + } + + let max = (start - cost.0) / cost.1; + if max >= usize::MAX.into() { + usize::MAX + } else { + max.as_u64() as usize + } + } + /// Create initial buffer parameter. pub fn create_buffer(&self) -> Buffer { Buffer { diff --git a/ethcore/src/light/net/error.rs b/ethcore/src/light/net/error.rs index e15bd50d3..0855cdeb8 100644 --- a/ethcore/src/light/net/error.rs +++ b/ethcore/src/light/net/error.rs @@ -52,6 +52,8 @@ pub enum Error { UnexpectedHandshake, /// Peer on wrong network (wrong NetworkId or genesis hash) WrongNetwork, + /// Unknown peer. + UnknownPeer, } impl Error { @@ -64,6 +66,7 @@ impl Error { Error::UnrecognizedPacket(_) => Punishment::Disconnect, Error::UnexpectedHandshake => Punishment::Disconnect, Error::WrongNetwork => Punishment::Disable, + Error::UnknownPeer => Punishment::Disconnect, } } } @@ -89,6 +92,7 @@ impl fmt::Display for Error { Error::UnrecognizedPacket(code) => write!(f, "Unrecognized packet: 0x{:x}", code), Error::UnexpectedHandshake => write!(f, "Unexpected handshake"), Error::WrongNetwork => write!(f, "Wrong network"), + Error::UnknownPeer => write!(f, "unknown peer"), } } } \ No newline at end of file diff --git a/ethcore/src/light/net/mod.rs b/ethcore/src/light/net/mod.rs index 8d369e7bb..a2202d460 100644 --- a/ethcore/src/light/net/mod.rs +++ b/ethcore/src/light/net/mod.rs @@ -24,9 +24,10 @@ use network::{NetworkProtocolHandler, NetworkContext, NetworkError, PeerId}; use rlp::{RlpStream, Stream, UntrustedRlp, View}; use util::hash::H256; use util::{Mutex, RwLock, U256}; +use time::SteadyTime; use std::collections::{HashMap, HashSet}; -use std::sync::atomic::AtomicUsize; +use std::sync::atomic::{AtomicUsize, Ordering}; use light::provider::Provider; use light::request::{self, Request}; @@ -39,7 +40,7 @@ mod buffer_flow; mod error; mod status; -pub use self::status::{Status, Capabilities, Announcement}; +pub use self::status::{Status, Capabilities, Announcement, NetworkId}; const TIMEOUT: TimerToken = 0; const TIMEOUT_INTERVAL_MS: u64 = 1000; @@ -86,6 +87,10 @@ mod packet { pub const HEADER_PROOFS: u8 = 0x0e; } +/// A request id. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct ReqId(usize); + // A pending peer: one we've sent our status to but // may not have received one for. struct PendingPeer { @@ -137,6 +142,24 @@ pub trait Handler: Send + Sync { fn on_transactions(&self, _id: PeerId, _relay: &[SignedTransaction]) { } } +// a request and the time it was made. +struct Requested { + request: Request, + timestamp: SteadyTime, +} + +/// Protocol parameters. +pub struct Params { + /// Genesis hash. + pub genesis_hash: H256, + /// Network id. + pub network_id: NetworkId, + /// Buffer flow parameters. + pub flow_params: FlowParams, + /// Initial capabilities. + pub capabilities: Capabilities, +} + /// This is an implementation of the light ethereum network protocol, abstracted /// over a `Provider` of data and a p2p network. /// @@ -146,10 +169,10 @@ pub trait Handler: Send + Sync { pub struct LightProtocol { provider: Box, genesis_hash: H256, - network_id: status::NetworkId, + network_id: NetworkId, pending_peers: RwLock>, peers: RwLock>, - pending_requests: RwLock>, + pending_requests: RwLock>, capabilities: RwLock, flow_params: FlowParams, // assumed static and same for every peer. handlers: Vec>, @@ -157,9 +180,71 @@ pub struct LightProtocol { } impl LightProtocol { + /// Create a new instance of the protocol manager. + pub fn new(provider: Box, params: Params) -> Self { + LightProtocol { + provider: provider, + genesis_hash: params.genesis_hash, + network_id: params.network_id, + pending_peers: RwLock::new(HashMap::new()), + peers: RwLock::new(HashMap::new()), + pending_requests: RwLock::new(HashMap::new()), + capabilities: RwLock::new(params.capabilities), + flow_params: params.flow_params, + handlers: Vec::new(), + req_id: AtomicUsize::new(0), + } + } + + /// Check the maximum amount of requests of a specific type + /// which a peer would be able to serve. + pub fn max_requests(&self, peer: PeerId, kind: request::Kind) -> Option { + self.peers.write().get_mut(&peer).map(|peer| { + peer.remote_flow.recharge(&mut peer.remote_buffer); + peer.remote_flow.max_amount(&peer.remote_buffer, kind) + }) + } + + /// Make a request to a peer. + /// + /// Fails on: nonexistent peer, network error, + /// insufficient buffer. Does not check capabilities before sending. + /// On success, returns a request id which can later be coordinated + /// with an event. + pub fn request_from(&self, io: &NetworkContext, peer_id: &PeerId, request: Request) -> Result { + let mut peers = self.peers.write(); + let peer = try!(peers.get_mut(peer_id).ok_or_else(|| Error::UnknownPeer)); + peer.remote_flow.recharge(&mut peer.remote_buffer); + + let max = peer.remote_flow.compute_cost(request.kind(), request.amount()); + try!(peer.remote_buffer.deduct_cost(max)); + + let req_id = self.req_id.fetch_add(1, Ordering::SeqCst); + let packet_data = encode_request(&request, req_id); + + let packet_id = match request.kind() { + request::Kind::Headers => packet::GET_BLOCK_HEADERS, + request::Kind::Bodies => packet::GET_BLOCK_BODIES, + request::Kind::Receipts => packet::GET_RECEIPTS, + request::Kind::StateProofs => packet::GET_PROOFS, + request::Kind::Codes => packet::GET_CONTRACT_CODES, + request::Kind::HeaderProofs => packet::GET_HEADER_PROOFS, + }; + + try!(io.send(*peer_id, packet_id, packet_data)); + + peer.current_asking.insert(req_id); + self.pending_requests.write().insert(req_id, Requested { + request: request, + timestamp: SteadyTime::now(), + }); + + Ok(ReqId(req_id)) + } + /// Make an announcement of new chain head and capabilities to all peers. /// The announcement is expected to be valid. - pub fn make_announcement(&self, mut announcement: Announcement, io: &NetworkContext) { + pub fn make_announcement(&self, io: &NetworkContext, mut announcement: Announcement) { let mut reorgs_map = HashMap::new(); // update stored capabilities @@ -715,4 +800,86 @@ impl NetworkProtocolHandler for LightProtocol { _ => warn!(target: "les", "received timeout on unknown token {}", timer), } } +} + +// Helper for encoding the request to RLP with the given ID. +fn encode_request(req: &Request, req_id: usize) -> Vec { + match *req { + Request::Headers(ref headers) => { + let mut stream = RlpStream::new_list(5); + stream + .append(&req_id) + .begin_list(2) + .append(&headers.block_num) + .append(&headers.block_hash) + .append(&headers.max) + .append(&headers.skip) + .append(&headers.reverse); + stream.out() + } + Request::Bodies(ref request) => { + let mut stream = RlpStream::new_list(request.block_hashes.len() + 1); + stream.append(&req_id); + + for hash in &request.block_hashes { + stream.append(hash); + } + + stream.out() + } + Request::Receipts(ref request) => { + let mut stream = RlpStream::new_list(request.block_hashes.len() + 1); + stream.append(&req_id); + + for hash in &request.block_hashes { + stream.append(hash); + } + + stream.out() + } + Request::StateProofs(ref request) => { + let mut stream = RlpStream::new_list(request.requests.len() + 1); + stream.append(&req_id); + + for proof_req in &request.requests { + stream.begin_list(4) + .append(&proof_req.block) + .append(&proof_req.key1); + + match proof_req.key2 { + Some(ref key2) => stream.append(key2), + None => stream.append_empty_data(), + }; + + stream.append(&proof_req.from_level); + } + + stream.out() + } + Request::Codes(ref request) => { + let mut stream = RlpStream::new_list(request.code_requests.len() + 1); + stream.append(&req_id); + + for code_req in &request.code_requests { + stream.begin_list(2) + .append(&code_req.block_hash) + .append(&code_req.account_key); + } + + stream.out() + } + Request::HeaderProofs(ref request) => { + let mut stream = RlpStream::new_list(request.requests.len() + 1); + stream.append(&req_id); + + for proof_req in &request.requests { + stream.begin_list(3) + .append(&proof_req.cht_number) + .append(&proof_req.block_number) + .append(&proof_req.from_level); + } + + stream.out() + } + } } \ No newline at end of file diff --git a/ethcore/src/light/net/status.rs b/ethcore/src/light/net/status.rs index e8f874621..2c0c5f79a 100644 --- a/ethcore/src/light/net/status.rs +++ b/ethcore/src/light/net/status.rs @@ -183,8 +183,10 @@ pub struct Capabilities { /// Whether this peer can serve headers pub serve_headers: bool, /// Earliest block number it can serve block/receipt requests for. + /// `None` means no requests will be servable. pub serve_chain_since: Option, /// Earliest block number it can serve state requests for. + /// `None` means no requests will be servable. pub serve_state_since: Option, /// Whether it can relay transactions to the eth network. pub tx_relay: bool, diff --git a/ethcore/src/types/les_request.rs b/ethcore/src/types/les_request.rs index 09d96db09..d0de080ee 100644 --- a/ethcore/src/types/les_request.rs +++ b/ethcore/src/types/les_request.rs @@ -152,4 +152,16 @@ impl Request { Request::HeaderProofs(_) => Kind::HeaderProofs, } } + + /// Get the amount of requests being made. + pub fn amount(&self) -> usize { + match *self { + Request::Headers(ref req) => req.max, + Request::Bodies(ref req) => req.block_hashes.len(), + Request::Receipts(ref req) => req.block_hashes.len(), + Request::StateProofs(ref req) => req.requests.len(), + Request::Codes(ref req) => req.code_requests.len(), + Request::HeaderProofs(ref req) => req.requests.len(), + } + } } \ No newline at end of file