light: implement all response handlers

This commit is contained in:
Robert Habermeier 2016-12-07 15:27:04 +01:00
parent cdc758368a
commit 10d75b6de0
3 changed files with 151 additions and 24 deletions

View File

@ -54,6 +54,8 @@ pub enum Error {
WrongNetwork, WrongNetwork,
/// Unknown peer. /// Unknown peer.
UnknownPeer, UnknownPeer,
/// Unsolicited response.
UnsolicitedResponse,
} }
impl Error { impl Error {
@ -67,6 +69,7 @@ impl Error {
Error::UnexpectedHandshake => Punishment::Disconnect, Error::UnexpectedHandshake => Punishment::Disconnect,
Error::WrongNetwork => Punishment::Disable, Error::WrongNetwork => Punishment::Disable,
Error::UnknownPeer => Punishment::Disconnect, Error::UnknownPeer => Punishment::Disconnect,
Error::UnsolicitedResponse => Punishment::Disable,
} }
} }
} }
@ -92,7 +95,8 @@ impl fmt::Display for Error {
Error::UnrecognizedPacket(code) => write!(f, "Unrecognized packet: 0x{:x}", code), Error::UnrecognizedPacket(code) => write!(f, "Unrecognized packet: 0x{:x}", code),
Error::UnexpectedHandshake => write!(f, "Unexpected handshake"), Error::UnexpectedHandshake => write!(f, "Unexpected handshake"),
Error::WrongNetwork => write!(f, "Wrong network"), Error::WrongNetwork => write!(f, "Wrong network"),
Error::UnknownPeer => write!(f, "unknown peer"), Error::UnknownPeer => write!(f, "Unknown peer"),
Error::UnsolicitedResponse => write!(f, "Peer provided unsolicited data"),
} }
} }
} }

View File

@ -29,7 +29,7 @@ use util::hash::H256;
use util::{Bytes, Mutex, RwLock, U256}; use util::{Bytes, Mutex, RwLock, U256};
use time::SteadyTime; use time::SteadyTime;
use std::collections::{HashMap, HashSet}; use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use provider::Provider; use provider::Provider;
@ -103,7 +103,6 @@ struct PendingPeer {
struct Peer { struct Peer {
local_buffer: Buffer, // their buffer relative to us local_buffer: Buffer, // their buffer relative to us
remote_buffer: Buffer, // our buffer relative to them remote_buffer: Buffer, // our buffer relative to them
current_asking: HashSet<usize>, // pending request ids.
status: Status, status: Status,
capabilities: Capabilities, capabilities: Capabilities,
remote_flow: FlowParams, remote_flow: FlowParams,
@ -137,6 +136,7 @@ impl Peer {
} }
/// Context for a network event. /// Context for a network event.
#[derive(Clone)]
pub struct EventContext<'a> { pub struct EventContext<'a> {
/// Protocol implementation. /// Protocol implementation.
pub proto: &'a LightProtocol, pub proto: &'a LightProtocol,
@ -177,15 +177,16 @@ pub trait Handler: Send + Sync {
fn on_state_proofs(&self, _ctx: EventContext, _req_id: ReqId, _proofs: &[Vec<Bytes>]) { } fn on_state_proofs(&self, _ctx: EventContext, _req_id: ReqId, _proofs: &[Vec<Bytes>]) { }
/// Called when a peer responds with contract code. /// Called when a peer responds with contract code.
fn on_code(&self, _ctx: EventContext, _req_id: ReqId, _codes: &[Bytes]) { } fn on_code(&self, _ctx: EventContext, _req_id: ReqId, _codes: &[Bytes]) { }
/// Called when a peer responds with header proofs. Each proof is a series of trie /// Called when a peer responds with header proofs. Each proof is a block header coupled
/// nodes is ascending order by distance from the root. /// with a series of trie nodes is ascending order by distance from the root.
fn on_header_proofs(&self, _ctx: EventContext, _req_id: ReqId, _proofs: &[Vec<Bytes>]) { } fn on_header_proofs(&self, _ctx: EventContext, _req_id: ReqId, _proofs: &[(Bytes, Vec<Bytes>)]) { }
} }
// a request and the time it was made. // a request, the peer who it was made to, and the time it was made.
struct Requested { struct Requested {
request: Request, request: Request,
timestamp: SteadyTime, timestamp: SteadyTime,
peer_id: PeerId,
} }
/// Protocol parameters. /// Protocol parameters.
@ -280,10 +281,10 @@ impl LightProtocol {
try!(io.send(*peer_id, packet_id, packet_data)); try!(io.send(*peer_id, packet_id, packet_data));
peer.current_asking.insert(req_id);
self.pending_requests.write().insert(req_id, Requested { self.pending_requests.write().insert(req_id, Requested {
request: request, request: request,
timestamp: SteadyTime::now(), timestamp: SteadyTime::now(),
peer_id: *peer_id,
}); });
Ok(ReqId(req_id)) Ok(ReqId(req_id))
@ -331,6 +332,38 @@ impl LightProtocol {
pub fn add_handler(&mut self, handler: Box<Handler>) { pub fn add_handler(&mut self, handler: Box<Handler>) {
self.handlers.push(handler); self.handlers.push(handler);
} }
// Does the common pre-verification of responses before the response itself
// is actually decoded:
// - check whether peer exists
// - check whether request was made
// - check whether request kinds match
fn pre_verify_response(&self, peer: &PeerId, kind: request::Kind, raw: &UntrustedRlp) -> Result<ReqId, Error> {
let req_id: usize = try!(raw.val_at(0));
let cur_buffer: U256 = try!(raw.val_at(1));
trace!(target: "les", "pre-verifying response from peer {}, kind={:?}", peer, kind);
match self.pending_requests.write().remove(&req_id) {
None => return Err(Error::UnsolicitedResponse),
Some(requested) => {
if requested.peer_id != *peer || requested.request.kind() != kind {
return Err(Error::UnsolicitedResponse)
}
}
}
let peers = self.peers.read();
match peers.get(peer) {
Some(peer_info) => {
let mut peer_info = peer_info.lock();
let actual_buffer = ::std::cmp::min(cur_buffer, *peer_info.remote_flow.limit());
peer_info.remote_buffer.update_to(actual_buffer);
Ok(ReqId(req_id))
}
None => Err(Error::UnknownPeer), // probably only occurs in a race of some kind.
}
}
} }
impl LightProtocol { impl LightProtocol {
@ -352,8 +385,13 @@ impl LightProtocol {
// called when a peer disconnects. // called when a peer disconnects.
fn on_disconnect(&self, peer: PeerId, io: &NetworkContext) { fn on_disconnect(&self, peer: PeerId, io: &NetworkContext) {
self.pending_peers.write().remove(&peer); self.pending_peers.write().remove(&peer);
if let Some(peer_info) = self.peers.write().remove(&peer) { if self.peers.write().remove(&peer).is_some() {
let unfulfilled: Vec<_> = peer_info.into_inner().current_asking.into_iter().map(ReqId).collect(); let unfulfilled: Vec<_> = self.pending_requests.read()
.iter()
.filter(|&(_, r)| r.peer_id == peer)
.map(|(&id, _)| ReqId(id))
.collect();
{ {
let mut pending = self.pending_requests.write(); let mut pending = self.pending_requests.write();
for &ReqId(ref inner) in &unfulfilled { for &ReqId(ref inner) in &unfulfilled {
@ -417,7 +455,6 @@ impl LightProtocol {
self.peers.write().insert(*peer, Mutex::new(Peer { self.peers.write().insert(*peer, Mutex::new(Peer {
local_buffer: self.flow_params.create_buffer(), local_buffer: self.flow_params.create_buffer(),
remote_buffer: flow_params.create_buffer(), remote_buffer: flow_params.create_buffer(),
current_asking: HashSet::new(),
status: status.clone(), status: status.clone(),
capabilities: capabilities.clone(), capabilities: capabilities.clone(),
remote_flow: flow_params, remote_flow: flow_params,
@ -530,8 +567,19 @@ impl LightProtocol {
} }
// Receive a response for block headers. // Receive a response for block headers.
fn block_headers(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { fn block_headers(&self, peer: &PeerId, io: &NetworkContext, raw: UntrustedRlp) -> Result<(), Error> {
unimplemented!() let req_id = try!(self.pre_verify_response(peer, request::Kind::Headers, &raw));
let raw_headers: Vec<_> = raw.iter().skip(2).map(|x| x.as_raw().to_owned()).collect();
for handler in &self.handlers {
handler.on_block_headers(EventContext {
peer: *peer,
io: io,
proto: self,
}, req_id, &raw_headers);
}
Ok(())
} }
// Handle a request for block bodies. // Handle a request for block bodies.
@ -576,8 +624,19 @@ impl LightProtocol {
} }
// Receive a response for block bodies. // Receive a response for block bodies.
fn block_bodies(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { fn block_bodies(&self, peer: &PeerId, io: &NetworkContext, raw: UntrustedRlp) -> Result<(), Error> {
unimplemented!() let req_id = try!(self.pre_verify_response(peer, request::Kind::Bodies, &raw));
let raw_bodies: Vec<Bytes> = raw.iter().skip(2).map(|x| x.as_raw().to_owned()).collect();
for handler in &self.handlers {
handler.on_block_bodies(EventContext {
peer: *peer,
io: io,
proto: self,
}, req_id, &raw_bodies);
}
Ok(())
} }
// Handle a request for receipts. // Handle a request for receipts.
@ -622,8 +681,23 @@ impl LightProtocol {
} }
// Receive a response for receipts. // Receive a response for receipts.
fn receipts(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { fn receipts(&self, peer: &PeerId, io: &NetworkContext, raw: UntrustedRlp) -> Result<(), Error> {
unimplemented!() let req_id = try!(self.pre_verify_response(peer, request::Kind::Receipts, &raw));
let raw_receipts: Vec<Vec<Receipt>> = try!(raw
.iter()
.skip(2)
.map(|x| x.as_val())
.collect());
for handler in &self.handlers {
handler.on_receipts(EventContext {
peer: *peer,
io: io,
proto: self,
}, req_id, &raw_receipts);
}
Ok(())
} }
// Handle a request for proofs. // Handle a request for proofs.
@ -679,8 +753,23 @@ impl LightProtocol {
} }
// Receive a response for proofs. // Receive a response for proofs.
fn proofs(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { fn proofs(&self, peer: &PeerId, io: &NetworkContext, raw: UntrustedRlp) -> Result<(), Error> {
unimplemented!() let req_id = try!(self.pre_verify_response(peer, request::Kind::StateProofs, &raw));
let raw_proofs: Vec<Vec<Bytes>> = raw.iter()
.skip(2)
.map(|x| x.iter().map(|node| node.as_raw().to_owned()).collect())
.collect();
for handler in &self.handlers {
handler.on_state_proofs(EventContext {
peer: *peer,
io: io,
proto: self,
}, req_id, &raw_proofs);
}
Ok(())
} }
// Handle a request for contract code. // Handle a request for contract code.
@ -734,8 +823,20 @@ impl LightProtocol {
} }
// Receive a response for contract code. // Receive a response for contract code.
fn contract_code(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { fn contract_code(&self, peer: &PeerId, io: &NetworkContext, raw: UntrustedRlp) -> Result<(), Error> {
unimplemented!() let req_id = try!(self.pre_verify_response(peer, request::Kind::Codes, &raw));
let raw_code: Vec<Bytes> = try!(raw.iter().skip(2).map(|x| x.as_val()).collect());
for handler in &self.handlers {
handler.on_code(EventContext {
peer: *peer,
io: io,
proto: self,
}, req_id, &raw_code);
}
Ok(())
} }
// Handle a request for header proofs // Handle a request for header proofs
@ -790,8 +891,27 @@ impl LightProtocol {
} }
// Receive a response for header proofs // Receive a response for header proofs
fn header_proofs(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { fn header_proofs(&self, peer: &PeerId, io: &NetworkContext, raw: UntrustedRlp) -> Result<(), Error> {
unimplemented!() fn decode_res(raw: UntrustedRlp) -> Result<(Bytes, Vec<Bytes>), ::rlp::DecoderError> {
Ok((
try!(raw.val_at(0)),
try!(raw.at(1)).iter().map(|x| x.as_raw().to_owned()).collect(),
))
}
let req_id = try!(self.pre_verify_response(peer, request::Kind::HeaderProofs, &raw));
let raw_proofs: Vec<_> = try!(raw.iter().skip(2).map(decode_res).collect());
for handler in &self.handlers {
handler.on_header_proofs(EventContext {
peer: *peer,
io: io,
proto: self,
}, req_id, &raw_proofs);
}
Ok(())
} }
// Receive a set of transactions to relay. // Receive a set of transactions to relay.

View File

@ -71,7 +71,10 @@ pub trait Provider: Send + Sync {
/// Each item in the resulting vector is either the raw bytecode or empty. /// Each item in the resulting vector is either the raw bytecode or empty.
fn contract_code(&self, req: request::ContractCodes) -> Vec<Bytes>; fn contract_code(&self, req: request::ContractCodes) -> Vec<Bytes>;
/// Provide header proofs from the Canonical Hash Tries. /// Provide header proofs from the Canonical Hash Tries as well as the headers
/// they correspond to -- each element in the returned vector is a 2-tuple.
/// The first element is a block header and the second a merkle proof of
/// the header in a requested CHT.
fn header_proofs(&self, req: request::HeaderProofs) -> Vec<Bytes>; fn header_proofs(&self, req: request::HeaderProofs) -> Vec<Bytes>;
/// Provide pending transactions. /// Provide pending transactions.