provide response context to response handler

This commit is contained in:
Robert Habermeier 2016-12-15 15:50:36 +01:00
parent 5d8bfd8758
commit ec88a992e3
2 changed files with 72 additions and 75 deletions

View File

@ -40,45 +40,6 @@ use util::{Bytes, U256, H256, Mutex, RwLock};
mod response; mod response;
mod sync_round; mod sync_round;
/// Light synchronization errors.
#[derive(Debug)]
pub enum Error {
/// Peer returned a malformed response.
MalformedResponse(response::BasicError),
/// Peer returned known bad block.
BadBlock,
/// Peer returned empty response.
EmptyResponse,
/// Peer returned a subchain with a broken parent connection.
ParentMismatch,
/// Protocol-level error.
ProtocolLevel(NetError),
}
impl From<NetError> for Error {
fn from(net_error: NetError) -> Self {
Error::ProtocolLevel(net_error)
}
}
impl From<response::BasicError> for Error {
fn from(err: response::BasicError) -> Self {
Error::MalformedResponse(err)
}
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::MalformedResponse(ref err) => write!(f, "{}", err),
Error::BadBlock => write!(f, "Block known to be bad"),
Error::EmptyResponse => write!(f, "Peer returned empty response."),
Error::ParentMismatch => write!(f, "Peer returned unknown block in place of parent."),
Error::ProtocolLevel(ref err) => write!(f, "Protocol level error: {}", err),
}
}
}
/// Peer chain info. /// Peer chain info.
#[derive(Clone)] #[derive(Clone)]
struct ChainInfo { struct ChainInfo {

View File

@ -17,7 +17,7 @@
//! Header download state machine. //! Header download state machine.
use std::cmp::Ordering; use std::cmp::Ordering;
use std::collections::{BinaryHeap, HashMap, VecDeque}; use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
use std::mem; use std::mem;
use ethcore::header::Header; use ethcore::header::Header;
@ -30,7 +30,6 @@ use network::PeerId;
use rlp::{UntrustedRlp, View}; use rlp::{UntrustedRlp, View};
use util::{Bytes, H256, Mutex}; use util::{Bytes, H256, Mutex};
use super::{Error, Peer};
use super::response; use super::response;
// amount of blocks between each scaffold entry. // amount of blocks between each scaffold entry.
@ -43,11 +42,23 @@ const ROUND_FRAMES: usize = 255;
// number of attempts to make to get a full scaffold for a sync round. // number of attempts to make to get a full scaffold for a sync round.
const SCAFFOLD_ATTEMPTS: usize = 3; const SCAFFOLD_ATTEMPTS: usize = 3;
/// Context for a headers response.
pub trait ResponseContext {
/// Get the peer who sent this response.
fn responder(&self) -> PeerId;
/// Get the request ID this response corresponds to.
fn req_id(&self) -> ReqId;
/// Get the (unverified) response data.
fn data(&self) -> &[Bytes];
/// Punish the responder.
fn punish_responder(&self);
}
/// Reasons for sync round abort. /// Reasons for sync round abort.
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone)]
pub enum AbortReason { pub enum AbortReason {
/// Bad chain downloaded. /// Bad sparse header chain along with a list of peers who contributed to it.
BadChain, BadScaffold(Vec<PeerId>),
/// No incoming data. /// No incoming data.
NoResponses, NoResponses,
} }
@ -82,12 +93,14 @@ pub struct Fetcher {
requests: BinaryHeap<SubchainRequest>, requests: BinaryHeap<SubchainRequest>,
complete_requests: HashMap<H256, SubchainRequest>, complete_requests: HashMap<H256, SubchainRequest>,
pending: HashMap<ReqId, SubchainRequest>, pending: HashMap<ReqId, SubchainRequest>,
scaffold_contributors: Vec<PeerId>,
} }
impl Fetcher { impl Fetcher {
// Produce a new fetcher given a sparse headerchain, in ascending order. // Produce a new fetcher given a sparse headerchain, in ascending order along
// with a list of peers who helped produce the chain.
// The headers must be valid RLP at this point. // The headers must be valid RLP at this point.
fn new(sparse_headers: Vec<Header>) -> Self { fn new(sparse_headers: Vec<Header>, contributors: Vec<PeerId>) -> Self {
let mut requests = BinaryHeap::with_capacity(sparse_headers.len() - 1); let mut requests = BinaryHeap::with_capacity(sparse_headers.len() - 1);
for pair in sparse_headers.windows(2) { for pair in sparse_headers.windows(2) {
@ -120,31 +133,43 @@ impl Fetcher {
requests: requests, requests: requests,
complete_requests: HashMap::new(), complete_requests: HashMap::new(),
pending: HashMap::new(), pending: HashMap::new(),
scaffold_contributors: contributors,
} }
} }
fn process_response(mut self, req_id: ReqId, headers: &[Bytes]) -> (SyncRound, Result<(), Error>) { fn process_response<R: ResponseContext>(mut self, ctx: &R) -> SyncRound {
let mut request = match self.pending.remove(&req_id) { let mut request = match self.pending.remove(&ctx.req_id()) {
Some(request) => request, Some(request) => request,
None => return (SyncRound::Fetch(self), Ok(())), None => return SyncRound::Fetch(self),
}; };
let headers = ctx.data();
if headers.len() == 0 { if headers.len() == 0 {
return (SyncRound::Fetch(self), Err(Error::EmptyResponse)); trace!(target: "sync", "Punishing peer {} for empty response", ctx.responder());
ctx.punish_responder();
return SyncRound::Fetch(self);
} }
match response::decode_and_verify(headers, &request.headers_request) { match response::decode_and_verify(headers, &request.headers_request) {
Err(e) => { Err(e) => {
// TODO: track number of attempts per request. trace!(target: "sync", "Punishing peer {} for invalid response ({})", ctx.responder(), e);
ctx.punish_responder();
// TODO: track number of attempts per request,
// abort if failure rate too high.
self.requests.push(request); self.requests.push(request);
(SyncRound::Fetch(self), Err(e).map_err(Into::into)) SyncRound::Fetch(self)
} }
Ok(headers) => { Ok(headers) => {
let mut parent_hash = None; let mut parent_hash = None;
for header in headers { for header in headers {
if parent_hash.as_ref().map_or(false, |h| h != &header.hash()) { if parent_hash.as_ref().map_or(false, |h| h != &header.hash()) {
trace!(target: "sync", "Punishing peer {} for parent mismatch", ctx.responder());
ctx.punish_responder();
self.requests.push(request); self.requests.push(request);
return (SyncRound::Fetch(self), Err(Error::ParentMismatch)); return SyncRound::Fetch(self);
} }
// incrementally update the frame request as we go so we can // incrementally update the frame request as we go so we can
@ -161,23 +186,29 @@ impl Fetcher {
// TODO: check subchain parent and punish peers who did framing // TODO: check subchain parent and punish peers who did framing
// if it's inaccurate. // if it's inaccurate.
if request.headers_request.max == 0 { if request.headers_request.max == 0 {
if parent_hash.map_or(true, |hash| hash != subchain_parent) {
let abort = AbortReason::BadScaffold(self.scaffold_contributors);
return SyncRound::Abort(abort);
}
self.complete_requests.insert(subchain_parent, request); self.complete_requests.insert(subchain_parent, request);
} }
// state transition not triggered until drain is finished. // state transition not triggered until drain is finished.
(SyncRound::Fetch(self), Ok(())) (SyncRound::Fetch(self))
} }
} }
} }
} }
// Round started: get stepped header chain. /// Round started: get stepped header chain.
// from a start block with number X we request 256 headers stepped by 256 from /// from a start block with number X we request 256 headers stepped by 256 from
// block X + 1. /// block X + 1.
struct RoundStart { pub struct RoundStart {
start_block: (u64, H256), start_block: (u64, H256),
pending_req: Option<(ReqId, HeadersRequest)>, pending_req: Option<(ReqId, HeadersRequest)>,
sparse_headers: Vec<Header>, sparse_headers: Vec<Header>,
contributors: HashSet<PeerId>,
attempt: usize, attempt: usize,
} }
@ -187,44 +218,49 @@ impl RoundStart {
start_block: start.clone(), start_block: start.clone(),
pending_req: None, pending_req: None,
sparse_headers: Vec::new(), sparse_headers: Vec::new(),
contributors: HashSet::new(),
attempt: 0, attempt: 0,
} }
} }
fn process_response(mut self, req_id: ReqId, headers: &[Bytes]) -> (SyncRound, Result<(), Error>) { fn process_response<R: ResponseContext>(mut self, ctx: &R) -> SyncRound {
let req = match self.pending_req.take() { let req = match self.pending_req.take() {
Some((id, ref req)) if req_id == id => { req.clone() } Some((id, ref req)) if ctx.req_id() == id => { req.clone() }
other => { other => {
self.pending_req = other; self.pending_req = other;
return (SyncRound::Start(self), Ok(())) return SyncRound::Start(self);
} }
}; };
self.attempt += 1; self.attempt += 1;
let res = match response::decode_and_verify(headers, &req) { match response::decode_and_verify(ctx.data(), &req) {
Ok(headers) => { Ok(headers) => {
self.contributors.insert(ctx.responder());
self.sparse_headers.extend(headers); self.sparse_headers.extend(headers);
if self.sparse_headers.len() == ROUND_FRAMES + 1 { if self.sparse_headers.len() == ROUND_FRAMES + 1 {
trace!(target: "sync", "Beginning fetch of blocks between {} sparse headers", trace!(target: "sync", "Beginning fetch of blocks between {} sparse headers",
self.sparse_headers.len()); self.sparse_headers.len());
return (SyncRound::Fetch(Fetcher::new(self.sparse_headers)), Ok(())); let fetcher = Fetcher::new(self.sparse_headers, self.contributors.into_iter().collect());
return SyncRound::Fetch(fetcher);
} }
Ok(())
} }
Err(e) => Err(e), Err(e) => {
trace!(target: "sync", "Punishing peer {} for malformed response ({})", ctx.responder(), e);
ctx.punish_responder();
}
}; };
if self.attempt >= SCAFFOLD_ATTEMPTS { if self.attempt >= SCAFFOLD_ATTEMPTS {
if self.sparse_headers.len() > 1 { if self.sparse_headers.len() > 1 {
(SyncRound::Fetch(Fetcher::new(self.sparse_headers)), res.map_err(Into::into)) let fetcher = Fetcher::new(self.sparse_headers, self.contributors.into_iter().collect());
SyncRound::Fetch(fetcher)
} else { } else {
(SyncRound::Abort(AbortReason::NoResponses), res.map_err(Into::into)) SyncRound::Abort(AbortReason::NoResponses)
} }
} else { } else {
(SyncRound::Start(self), res.map_err(Into::into)) SyncRound::Start(self)
} }
} }
} }
@ -247,23 +283,23 @@ impl SyncRound {
} }
/// Process an answer to a request. Unknown requests will be ignored. /// Process an answer to a request. Unknown requests will be ignored.
pub fn process_response(self, req_id: ReqId, headers: &[Bytes]) -> (Self, Result<(), Error>) { pub fn process_response<R: ResponseContext>(self, ctx: &R) -> Self {
match self { match self {
SyncRound::Start(round_start) => round_start.process_response(req_id, headers), SyncRound::Start(round_start) => round_start.process_response(ctx),
SyncRound::Fetch(fetcher) => fetcher.process_response(req_id, headers), SyncRound::Fetch(fetcher) => fetcher.process_response(ctx),
other => (other, Ok(())), other => other,
} }
} }
/// Return unfulfilled requests from disconnected peer. Unknown requests will be ignored. /// Return unfulfilled requests from disconnected peer. Unknown requests will be ignored.
pub fn requests_abandoned(self, abandoned: &[ReqId]) -> (Self, Result<(), Error>) { pub fn requests_abandoned(self, abandoned: &[ReqId]) -> Self {
unimplemented!() unimplemented!()
} }
/// Dispatch pending requests. The dispatcher provided will attempt to /// Dispatch pending requests. The dispatcher provided will attempt to
/// find a suitable peer to serve the request. /// find a suitable peer to serve the request.
// TODO: have dispatcher take capabilities argument? // TODO: have dispatcher take capabilities argument?
pub fn dispatch_requests<D>(self, dispatcher: D) -> (Self, Result<(), Error>) pub fn dispatch_requests<D>(self, dispatcher: D) -> Self
where D: Fn(HeadersRequest) -> Option<ReqId> where D: Fn(HeadersRequest) -> Option<ReqId>
{ {
unimplemented!() unimplemented!()