diff --git a/ethcore/light/src/net/mod.rs b/ethcore/light/src/net/mod.rs index 9f08c4ca6..965e46db6 100644 --- a/ethcore/light/src/net/mod.rs +++ b/ethcore/light/src/net/mod.rs @@ -36,9 +36,11 @@ use provider::Provider; use request::{self, Request}; use self::buffer_flow::{Buffer, FlowParams}; +use self::context::{IoContext, EventContext, Ctx}; use self::error::{Error, Punishment}; mod buffer_flow; +mod context; mod error; mod status; @@ -135,18 +137,6 @@ impl Peer { } } -/// Context for a network event. -#[derive(Clone)] -pub struct EventContext<'a> { - /// Protocol implementation. - pub proto: &'a LightProtocol, - /// Network context to enable immediate response to - /// events. - pub io: &'a NetworkContext<'a>, - /// Relevant peer for event. - pub peer: PeerId, -} - /// An LES event handler. /// /// Each handler function takes a context which describes the relevant peer @@ -158,28 +148,28 @@ pub struct EventContext<'a> { /// that relevant data will be stored by interested handlers. pub trait Handler: Send + Sync { /// Called when a peer connects. - fn on_connect(&self, _ctx: EventContext, _status: &Status, _capabilities: &Capabilities) { } + fn on_connect(&self, _ctx: &EventContext, _status: &Status, _capabilities: &Capabilities) { } /// Called when a peer disconnects, with a list of unfulfilled request IDs as /// of yet. - fn on_disconnect(&self, _ctx: EventContext, _unfulfilled: &[ReqId]) { } + fn on_disconnect(&self, _ctx: &EventContext, _unfulfilled: &[ReqId]) { } /// Called when a peer makes an announcement. - fn on_announcement(&self, _ctx: EventContext, _announcement: &Announcement) { } + fn on_announcement(&self, _ctx: &EventContext, _announcement: &Announcement) { } /// Called when a peer requests relay of some transactions. - fn on_transactions(&self, _ctx: EventContext, _relay: &[SignedTransaction]) { } + fn on_transactions(&self, _ctx: &EventContext, _relay: &[SignedTransaction]) { } /// Called when a peer responds with block bodies. - fn on_block_bodies(&self, _ctx: EventContext, _req_id: ReqId, _bodies: &[Bytes]) { } + fn on_block_bodies(&self, _ctx: &EventContext, _req_id: ReqId, _bodies: &[Bytes]) { } /// Called when a peer responds with block headers. - fn on_block_headers(&self, _ctx: EventContext, _req_id: ReqId, _headers: &[Bytes]) { } + fn on_block_headers(&self, _ctx: &EventContext, _req_id: ReqId, _headers: &[Bytes]) { } /// Called when a peer responds with block receipts. - fn on_receipts(&self, _ctx: EventContext, _req_id: ReqId, _receipts: &[Vec]) { } + fn on_receipts(&self, _ctx: &EventContext, _req_id: ReqId, _receipts: &[Vec]) { } /// Called when a peer responds with state proofs. Each proof is a series of trie /// nodes in ascending order by distance from the root. - fn on_state_proofs(&self, _ctx: EventContext, _req_id: ReqId, _proofs: &[Vec]) { } + fn on_state_proofs(&self, _ctx: &EventContext, _req_id: ReqId, _proofs: &[Vec]) { } /// Called when a peer responds with contract code. - fn on_code(&self, _ctx: EventContext, _req_id: ReqId, _codes: &[Bytes]) { } + fn on_code(&self, _ctx: &EventContext, _req_id: ReqId, _codes: &[Bytes]) { } /// Called when a peer responds with header proofs. Each proof is a block header coupled /// with a series of trie nodes is ascending order by distance from the root. - fn on_header_proofs(&self, _ctx: EventContext, _req_id: ReqId, _proofs: &[(Bytes, Vec)]) { } + fn on_header_proofs(&self, _ctx: &EventContext, _req_id: ReqId, _proofs: &[(Bytes, Vec)]) { } } // a request, the peer who it was made to, and the time it was made. @@ -257,7 +247,7 @@ impl LightProtocol { /// insufficient buffer. Does not check capabilities before sending. /// On success, returns a request id which can later be coordinated /// with an event. - pub fn request_from(&self, io: &NetworkContext, peer_id: &PeerId, request: Request) -> Result { + pub fn request_from(&self, io: &IoContext, peer_id: &PeerId, request: Request) -> Result { let peers = self.peers.read(); let peer = try!(peers.get(peer_id).ok_or_else(|| Error::UnknownPeer)); let mut peer = peer.lock(); @@ -279,7 +269,7 @@ impl LightProtocol { request::Kind::HeaderProofs => packet::GET_HEADER_PROOFS, }; - try!(io.send(*peer_id, packet_id, packet_data)); + io.send(*peer_id, packet_id, packet_data); self.pending_requests.write().insert(req_id, Requested { request: request, @@ -292,7 +282,7 @@ impl LightProtocol { /// Make an announcement of new chain head and capabilities to all peers. /// The announcement is expected to be valid. - pub fn make_announcement(&self, io: &NetworkContext, mut announcement: Announcement) { + pub fn make_announcement(&self, io: &IoContext, mut announcement: Announcement) { let mut reorgs_map = HashMap::new(); // update stored capabilities @@ -318,9 +308,7 @@ impl LightProtocol { peer_info.sent_head = announcement.head_hash; announcement.reorg_depth = *reorg_depth; - if let Err(e) = io.send(*peer_id, packet::ANNOUNCE, status::write_announcement(&announcement)) { - debug!(target: "les", "Error sending to peer {}: {}", peer_id, e); - } + io.send(*peer_id, packet::ANNOUNCE, status::write_announcement(&announcement)); } } @@ -364,11 +352,61 @@ impl LightProtocol { None => Err(Error::UnknownPeer), // probably only occurs in a race of some kind. } } + + // handle a packet using the given io context. + fn handle_packet(&self, io: &IoContext, peer: &PeerId, packet_id: u8, data: &[u8]) { + let rlp = UntrustedRlp::new(data); + + // handle the packet + let res = match packet_id { + packet::STATUS => self.status(peer, io, rlp), + packet::ANNOUNCE => self.announcement(peer, io, rlp), + + packet::GET_BLOCK_HEADERS => self.get_block_headers(peer, io, rlp), + packet::BLOCK_HEADERS => self.block_headers(peer, io, rlp), + + packet::GET_BLOCK_BODIES => self.get_block_bodies(peer, io, rlp), + packet::BLOCK_BODIES => self.block_bodies(peer, io, rlp), + + packet::GET_RECEIPTS => self.get_receipts(peer, io, rlp), + packet::RECEIPTS => self.receipts(peer, io, rlp), + + packet::GET_PROOFS => self.get_proofs(peer, io, rlp), + packet::PROOFS => self.proofs(peer, io, rlp), + + packet::GET_CONTRACT_CODES => self.get_contract_code(peer, io, rlp), + packet::CONTRACT_CODES => self.contract_code(peer, io, rlp), + + packet::GET_HEADER_PROOFS => self.get_header_proofs(peer, io, rlp), + packet::HEADER_PROOFS => self.header_proofs(peer, io, rlp), + + packet::SEND_TRANSACTIONS => self.relay_transactions(peer, io, rlp), + + other => { + Err(Error::UnrecognizedPacket(other)) + } + }; + + // if something went wrong, figure out how much to punish the peer. + if let Err(e) = res { + match e.punishment() { + Punishment::None => {} + Punishment::Disconnect => { + debug!(target: "les", "Disconnecting peer {}: {}", peer, e); + io.disconnect_peer(*peer) + } + Punishment::Disable => { + debug!(target: "les", "Disabling peer {}: {}", peer, e); + io.disable_peer(*peer) + } + } + } + } } impl LightProtocol { // called when a peer connects. - fn on_connect(&self, peer: &PeerId, io: &NetworkContext) { + fn on_connect(&self, peer: &PeerId, io: &IoContext) { let peer = *peer; match self.send_status(peer, io) { @@ -383,7 +421,7 @@ impl LightProtocol { } // called when a peer disconnects. - fn on_disconnect(&self, peer: PeerId, io: &NetworkContext) { + fn on_disconnect(&self, peer: PeerId, io: &IoContext) { self.pending_peers.write().remove(&peer); if self.peers.write().remove(&peer).is_some() { let unfulfilled: Vec<_> = self.pending_requests.read() @@ -400,7 +438,7 @@ impl LightProtocol { } for handler in &self.handlers { - handler.on_disconnect(EventContext { + handler.on_disconnect(&Ctx { peer: peer, io: io, proto: self, @@ -410,7 +448,7 @@ impl LightProtocol { } // send status to a peer. - fn send_status(&self, peer: PeerId, io: &NetworkContext) -> Result { + fn send_status(&self, peer: PeerId, io: &IoContext) -> Result { let chain_info = self.provider.chain_info(); // TODO: could update capabilities here. @@ -428,7 +466,7 @@ impl LightProtocol { let capabilities = self.capabilities.read().clone(); let status_packet = status::write_handshake(&status, &capabilities, &self.flow_params); - try!(io.send(peer, packet::STATUS, status_packet)); + io.send(peer, packet::STATUS, status_packet); Ok(PendingPeer { sent_head: chain_info.best_block_hash, @@ -436,7 +474,7 @@ impl LightProtocol { } // Handle status message from peer. - fn status(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { + fn status(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> { let pending = match self.pending_peers.write().remove(peer) { Some(pending) => pending, None => { @@ -462,7 +500,7 @@ impl LightProtocol { })); for handler in &self.handlers { - handler.on_connect(EventContext { + handler.on_connect(&Ctx { peer: *peer, io: io, proto: self, @@ -473,7 +511,7 @@ impl LightProtocol { } // Handle an announcement. - fn announcement(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { + fn announcement(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> { if !self.peers.read().contains_key(peer) { debug!(target: "les", "Ignoring announcement from unknown peer"); return Ok(()) @@ -507,7 +545,7 @@ impl LightProtocol { } for handler in &self.handlers { - handler.on_announcement(EventContext { + handler.on_announcement(&Ctx { peer: *peer, io: io, proto: self, @@ -518,7 +556,7 @@ impl LightProtocol { } // Handle a request for block headers. - fn get_block_headers(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { + fn get_block_headers(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> { const MAX_HEADERS: usize = 512; let peers = self.peers.read(); @@ -563,16 +601,18 @@ impl LightProtocol { } stream.out() - }).map_err(Into::into) + }); + + Ok(()) } // Receive a response for block headers. - fn block_headers(&self, peer: &PeerId, io: &NetworkContext, raw: UntrustedRlp) -> Result<(), Error> { + fn block_headers(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> { let req_id = try!(self.pre_verify_response(peer, request::Kind::Headers, &raw)); let raw_headers: Vec<_> = raw.iter().skip(2).map(|x| x.as_raw().to_owned()).collect(); for handler in &self.handlers { - handler.on_block_headers(EventContext { + handler.on_block_headers(&Ctx { peer: *peer, io: io, proto: self, @@ -583,7 +623,7 @@ impl LightProtocol { } // Handle a request for block bodies. - fn get_block_bodies(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { + fn get_block_bodies(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> { const MAX_BODIES: usize = 256; let peers = self.peers.read(); @@ -620,16 +660,18 @@ impl LightProtocol { } stream.out() - }).map_err(Into::into) + }); + + Ok(()) } // Receive a response for block bodies. - fn block_bodies(&self, peer: &PeerId, io: &NetworkContext, raw: UntrustedRlp) -> Result<(), Error> { + fn block_bodies(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> { let req_id = try!(self.pre_verify_response(peer, request::Kind::Bodies, &raw)); let raw_bodies: Vec = raw.iter().skip(2).map(|x| x.as_raw().to_owned()).collect(); for handler in &self.handlers { - handler.on_block_bodies(EventContext { + handler.on_block_bodies(&Ctx { peer: *peer, io: io, proto: self, @@ -640,7 +682,7 @@ impl LightProtocol { } // Handle a request for receipts. - fn get_receipts(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { + fn get_receipts(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> { const MAX_RECEIPTS: usize = 256; let peers = self.peers.read(); @@ -677,11 +719,13 @@ impl LightProtocol { } stream.out() - }).map_err(Into::into) + }); + + Ok(()) } // Receive a response for receipts. - fn receipts(&self, peer: &PeerId, io: &NetworkContext, raw: UntrustedRlp) -> Result<(), Error> { + fn receipts(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> { let req_id = try!(self.pre_verify_response(peer, request::Kind::Receipts, &raw)); let raw_receipts: Vec> = try!(raw .iter() @@ -690,7 +734,7 @@ impl LightProtocol { .collect()); for handler in &self.handlers { - handler.on_receipts(EventContext { + handler.on_receipts(&Ctx { peer: *peer, io: io, proto: self, @@ -701,7 +745,7 @@ impl LightProtocol { } // Handle a request for proofs. - fn get_proofs(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { + fn get_proofs(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> { const MAX_PROOFS: usize = 128; let peers = self.peers.read(); @@ -749,11 +793,13 @@ impl LightProtocol { } stream.out() - }).map_err(Into::into) + }); + + Ok(()) } // Receive a response for proofs. - fn proofs(&self, peer: &PeerId, io: &NetworkContext, raw: UntrustedRlp) -> Result<(), Error> { + fn proofs(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> { let req_id = try!(self.pre_verify_response(peer, request::Kind::StateProofs, &raw)); let raw_proofs: Vec> = raw.iter() @@ -762,7 +808,7 @@ impl LightProtocol { .collect(); for handler in &self.handlers { - handler.on_state_proofs(EventContext { + handler.on_state_proofs(&Ctx { peer: *peer, io: io, proto: self, @@ -773,7 +819,7 @@ impl LightProtocol { } // Handle a request for contract code. - fn get_contract_code(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { + fn get_contract_code(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> { const MAX_CODES: usize = 256; let peers = self.peers.read(); @@ -819,17 +865,19 @@ impl LightProtocol { } stream.out() - }).map_err(Into::into) + }); + + Ok(()) } // Receive a response for contract code. - fn contract_code(&self, peer: &PeerId, io: &NetworkContext, raw: UntrustedRlp) -> Result<(), Error> { + fn contract_code(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> { let req_id = try!(self.pre_verify_response(peer, request::Kind::Codes, &raw)); let raw_code: Vec = try!(raw.iter().skip(2).map(|x| x.as_val()).collect()); for handler in &self.handlers { - handler.on_code(EventContext { + handler.on_code(&Ctx { peer: *peer, io: io, proto: self, @@ -840,7 +888,7 @@ impl LightProtocol { } // Handle a request for header proofs - fn get_header_proofs(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { + fn get_header_proofs(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> { const MAX_PROOFS: usize = 256; let peers = self.peers.read(); @@ -887,11 +935,13 @@ impl LightProtocol { } stream.out() - }).map_err(Into::into) + }); + + Ok(()) } // Receive a response for header proofs - fn header_proofs(&self, peer: &PeerId, io: &NetworkContext, raw: UntrustedRlp) -> Result<(), Error> { + fn header_proofs(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> { fn decode_res(raw: UntrustedRlp) -> Result<(Bytes, Vec), ::rlp::DecoderError> { Ok(( try!(raw.val_at(0)), @@ -904,7 +954,7 @@ impl LightProtocol { let raw_proofs: Vec<_> = try!(raw.iter().skip(2).map(decode_res).collect()); for handler in &self.handlers { - handler.on_header_proofs(EventContext { + handler.on_header_proofs(&Ctx { peer: *peer, io: io, proto: self, @@ -915,7 +965,7 @@ impl LightProtocol { } // Receive a set of transactions to relay. - fn relay_transactions(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { + fn relay_transactions(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> { const MAX_TRANSACTIONS: usize = 256; let txs: Vec<_> = try!(data.iter().take(MAX_TRANSACTIONS).map(|x| x.as_val::()).collect()); @@ -923,7 +973,7 @@ impl LightProtocol { debug!(target: "les", "Received {} transactions to relay from peer {}", txs.len(), peer); for handler in &self.handlers { - handler.on_transactions(EventContext { + handler.on_transactions(&Ctx { peer: *peer, io: io, proto: self, @@ -940,52 +990,7 @@ impl NetworkProtocolHandler for LightProtocol { } fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { - let rlp = UntrustedRlp::new(data); - - // handle the packet - let res = match packet_id { - packet::STATUS => self.status(peer, io, rlp), - packet::ANNOUNCE => self.announcement(peer, io, rlp), - - packet::GET_BLOCK_HEADERS => self.get_block_headers(peer, io, rlp), - packet::BLOCK_HEADERS => self.block_headers(peer, io, rlp), - - packet::GET_BLOCK_BODIES => self.get_block_bodies(peer, io, rlp), - packet::BLOCK_BODIES => self.block_bodies(peer, io, rlp), - - packet::GET_RECEIPTS => self.get_receipts(peer, io, rlp), - packet::RECEIPTS => self.receipts(peer, io, rlp), - - packet::GET_PROOFS => self.get_proofs(peer, io, rlp), - packet::PROOFS => self.proofs(peer, io, rlp), - - packet::GET_CONTRACT_CODES => self.get_contract_code(peer, io, rlp), - packet::CONTRACT_CODES => self.contract_code(peer, io, rlp), - - packet::GET_HEADER_PROOFS => self.get_header_proofs(peer, io, rlp), - packet::HEADER_PROOFS => self.header_proofs(peer, io, rlp), - - packet::SEND_TRANSACTIONS => self.relay_transactions(peer, io, rlp), - - other => { - Err(Error::UnrecognizedPacket(other)) - } - }; - - // if something went wrong, figure out how much to punish the peer. - if let Err(e) = res { - match e.punishment() { - Punishment::None => {} - Punishment::Disconnect => { - debug!(target: "les", "Disconnecting peer {}: {}", peer, e); - io.disconnect_peer(*peer) - } - Punishment::Disable => { - debug!(target: "les", "Disabling peer {}: {}", peer, e); - io.disable_peer(*peer) - } - } - } + self.handle_packet(io, peer, packet_id, data); } fn connected(&self, io: &NetworkContext, peer: &PeerId) {