diff --git a/sync/src/light_sync/mod.rs b/sync/src/light_sync/mod.rs index ff7fd149b..243afa6f7 100644 --- a/sync/src/light_sync/mod.rs +++ b/sync/src/light_sync/mod.rs @@ -40,45 +40,6 @@ use util::{Bytes, U256, H256, Mutex, RwLock}; mod response; mod sync_round; -/// Light synchronization errors. -#[derive(Debug)] -pub enum Error { - /// Peer returned a malformed response. - MalformedResponse(response::BasicError), - /// Peer returned known bad block. - BadBlock, - /// Peer returned empty response. - EmptyResponse, - /// Peer returned a subchain with a broken parent connection. - ParentMismatch, - /// Protocol-level error. - ProtocolLevel(NetError), -} - -impl From for Error { - fn from(net_error: NetError) -> Self { - Error::ProtocolLevel(net_error) - } -} - -impl From for Error { - fn from(err: response::BasicError) -> Self { - Error::MalformedResponse(err) - } -} - -impl fmt::Display for Error { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match *self { - Error::MalformedResponse(ref err) => write!(f, "{}", err), - Error::BadBlock => write!(f, "Block known to be bad"), - Error::EmptyResponse => write!(f, "Peer returned empty response."), - Error::ParentMismatch => write!(f, "Peer returned unknown block in place of parent."), - Error::ProtocolLevel(ref err) => write!(f, "Protocol level error: {}", err), - } - } -} - /// Peer chain info. #[derive(Clone)] struct ChainInfo { diff --git a/sync/src/light_sync/sync_round.rs b/sync/src/light_sync/sync_round.rs index 42a543ba3..a3584c34c 100644 --- a/sync/src/light_sync/sync_round.rs +++ b/sync/src/light_sync/sync_round.rs @@ -17,7 +17,7 @@ //! Header download state machine. use std::cmp::Ordering; -use std::collections::{BinaryHeap, HashMap, VecDeque}; +use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque}; use std::mem; use ethcore::header::Header; @@ -30,7 +30,6 @@ use network::PeerId; use rlp::{UntrustedRlp, View}; use util::{Bytes, H256, Mutex}; -use super::{Error, Peer}; use super::response; // amount of blocks between each scaffold entry. @@ -43,11 +42,23 @@ const ROUND_FRAMES: usize = 255; // number of attempts to make to get a full scaffold for a sync round. const SCAFFOLD_ATTEMPTS: usize = 3; +/// Context for a headers response. +pub trait ResponseContext { + /// Get the peer who sent this response. + fn responder(&self) -> PeerId; + /// Get the request ID this response corresponds to. + fn req_id(&self) -> ReqId; + /// Get the (unverified) response data. + fn data(&self) -> &[Bytes]; + /// Punish the responder. + fn punish_responder(&self); +} + /// Reasons for sync round abort. -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone)] pub enum AbortReason { - /// Bad chain downloaded. - BadChain, + /// Bad sparse header chain along with a list of peers who contributed to it. + BadScaffold(Vec), /// No incoming data. NoResponses, } @@ -82,12 +93,14 @@ pub struct Fetcher { requests: BinaryHeap, complete_requests: HashMap, pending: HashMap, + scaffold_contributors: Vec, } impl Fetcher { - // Produce a new fetcher given a sparse headerchain, in ascending order. + // Produce a new fetcher given a sparse headerchain, in ascending order along + // with a list of peers who helped produce the chain. // The headers must be valid RLP at this point. - fn new(sparse_headers: Vec
) -> Self { + fn new(sparse_headers: Vec
, contributors: Vec) -> Self { let mut requests = BinaryHeap::with_capacity(sparse_headers.len() - 1); for pair in sparse_headers.windows(2) { @@ -120,31 +133,43 @@ impl Fetcher { requests: requests, complete_requests: HashMap::new(), pending: HashMap::new(), + scaffold_contributors: contributors, } } - fn process_response(mut self, req_id: ReqId, headers: &[Bytes]) -> (SyncRound, Result<(), Error>) { - let mut request = match self.pending.remove(&req_id) { + fn process_response(mut self, ctx: &R) -> SyncRound { + let mut request = match self.pending.remove(&ctx.req_id()) { Some(request) => request, - None => return (SyncRound::Fetch(self), Ok(())), + None => return SyncRound::Fetch(self), }; + let headers = ctx.data(); + if headers.len() == 0 { - return (SyncRound::Fetch(self), Err(Error::EmptyResponse)); + trace!(target: "sync", "Punishing peer {} for empty response", ctx.responder()); + ctx.punish_responder(); + return SyncRound::Fetch(self); } match response::decode_and_verify(headers, &request.headers_request) { Err(e) => { - // TODO: track number of attempts per request. + trace!(target: "sync", "Punishing peer {} for invalid response ({})", ctx.responder(), e); + ctx.punish_responder(); + + // TODO: track number of attempts per request, + // abort if failure rate too high. self.requests.push(request); - (SyncRound::Fetch(self), Err(e).map_err(Into::into)) + SyncRound::Fetch(self) } Ok(headers) => { let mut parent_hash = None; for header in headers { if parent_hash.as_ref().map_or(false, |h| h != &header.hash()) { + trace!(target: "sync", "Punishing peer {} for parent mismatch", ctx.responder()); + ctx.punish_responder(); + self.requests.push(request); - return (SyncRound::Fetch(self), Err(Error::ParentMismatch)); + return SyncRound::Fetch(self); } // incrementally update the frame request as we go so we can @@ -161,23 +186,29 @@ impl Fetcher { // TODO: check subchain parent and punish peers who did framing // if it's inaccurate. if request.headers_request.max == 0 { + if parent_hash.map_or(true, |hash| hash != subchain_parent) { + let abort = AbortReason::BadScaffold(self.scaffold_contributors); + return SyncRound::Abort(abort); + } + self.complete_requests.insert(subchain_parent, request); } // state transition not triggered until drain is finished. - (SyncRound::Fetch(self), Ok(())) + (SyncRound::Fetch(self)) } } } } -// Round started: get stepped header chain. -// from a start block with number X we request 256 headers stepped by 256 from -// block X + 1. -struct RoundStart { +/// Round started: get stepped header chain. +/// from a start block with number X we request 256 headers stepped by 256 from +/// block X + 1. +pub struct RoundStart { start_block: (u64, H256), pending_req: Option<(ReqId, HeadersRequest)>, sparse_headers: Vec
, + contributors: HashSet, attempt: usize, } @@ -187,44 +218,49 @@ impl RoundStart { start_block: start.clone(), pending_req: None, sparse_headers: Vec::new(), + contributors: HashSet::new(), attempt: 0, } } - fn process_response(mut self, req_id: ReqId, headers: &[Bytes]) -> (SyncRound, Result<(), Error>) { + fn process_response(mut self, ctx: &R) -> SyncRound { let req = match self.pending_req.take() { - Some((id, ref req)) if req_id == id => { req.clone() } + Some((id, ref req)) if ctx.req_id() == id => { req.clone() } other => { self.pending_req = other; - return (SyncRound::Start(self), Ok(())) + return SyncRound::Start(self); } }; self.attempt += 1; - let res = match response::decode_and_verify(headers, &req) { + match response::decode_and_verify(ctx.data(), &req) { Ok(headers) => { + self.contributors.insert(ctx.responder()); self.sparse_headers.extend(headers); if self.sparse_headers.len() == ROUND_FRAMES + 1 { trace!(target: "sync", "Beginning fetch of blocks between {} sparse headers", self.sparse_headers.len()); - return (SyncRound::Fetch(Fetcher::new(self.sparse_headers)), Ok(())); + let fetcher = Fetcher::new(self.sparse_headers, self.contributors.into_iter().collect()); + return SyncRound::Fetch(fetcher); } - - Ok(()) } - Err(e) => Err(e), + Err(e) => { + trace!(target: "sync", "Punishing peer {} for malformed response ({})", ctx.responder(), e); + ctx.punish_responder(); + } }; if self.attempt >= SCAFFOLD_ATTEMPTS { if self.sparse_headers.len() > 1 { - (SyncRound::Fetch(Fetcher::new(self.sparse_headers)), res.map_err(Into::into)) + let fetcher = Fetcher::new(self.sparse_headers, self.contributors.into_iter().collect()); + SyncRound::Fetch(fetcher) } else { - (SyncRound::Abort(AbortReason::NoResponses), res.map_err(Into::into)) + SyncRound::Abort(AbortReason::NoResponses) } } else { - (SyncRound::Start(self), res.map_err(Into::into)) + SyncRound::Start(self) } } } @@ -247,23 +283,23 @@ impl SyncRound { } /// Process an answer to a request. Unknown requests will be ignored. - pub fn process_response(self, req_id: ReqId, headers: &[Bytes]) -> (Self, Result<(), Error>) { + pub fn process_response(self, ctx: &R) -> Self { match self { - SyncRound::Start(round_start) => round_start.process_response(req_id, headers), - SyncRound::Fetch(fetcher) => fetcher.process_response(req_id, headers), - other => (other, Ok(())), + SyncRound::Start(round_start) => round_start.process_response(ctx), + SyncRound::Fetch(fetcher) => fetcher.process_response(ctx), + other => other, } } /// Return unfulfilled requests from disconnected peer. Unknown requests will be ignored. - pub fn requests_abandoned(self, abandoned: &[ReqId]) -> (Self, Result<(), Error>) { + pub fn requests_abandoned(self, abandoned: &[ReqId]) -> Self { unimplemented!() } /// Dispatch pending requests. The dispatcher provided will attempt to /// find a suitable peer to serve the request. // TODO: have dispatcher take capabilities argument? - pub fn dispatch_requests(self, dispatcher: D) -> (Self, Result<(), Error>) + pub fn dispatch_requests(self, dispatcher: D) -> Self where D: Fn(HeadersRequest) -> Option { unimplemented!()