From fa42b6acecab326f773ea3ec7a4d342b02d0dd18 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 16 Mar 2017 23:51:47 +0100 Subject: [PATCH] port ethsync to PIP messages --- sync/src/light_sync/mod.rs | 65 ++++++++++++++++++--------- sync/src/light_sync/response.rs | 34 +++++++------- sync/src/light_sync/sync_round.rs | 27 +++++------ sync/src/light_sync/tests/test_net.rs | 6 ++- 4 files changed, 79 insertions(+), 53 deletions(-) diff --git a/sync/src/light_sync/mod.rs b/sync/src/light_sync/mod.rs index fba89dd7b..4590103e7 100644 --- a/sync/src/light_sync/mod.rs +++ b/sync/src/light_sync/mod.rs @@ -16,7 +16,7 @@ //! Light client synchronization. //! -//! This will synchronize the header chain using LES messages. +//! This will synchronize the header chain using PIP messages. //! Dataflow is largely one-directional as headers are pushed into //! the light client queue for import. Where possible, they are batched //! in groups. @@ -36,14 +36,15 @@ use std::collections::HashMap; use std::mem; use std::sync::Arc; +use ethcore::encoded; use light::client::{AsLightClient, LightChainClient}; use light::net::{ Announcement, Handler, BasicContext, EventContext, - Capabilities, ReqId, Status, + Capabilities, ReqId, Status, Error as NetError, }; -use light::request; +use light::request::{self, CompleteHeadersRequest as HeadersRequest}; use network::PeerId; -use util::{Bytes, U256, H256, Mutex, RwLock}; +use util::{U256, H256, Mutex, RwLock}; use rand::{Rng, OsRng}; use self::sync_round::{AbortReason, SyncRound, ResponseContext}; @@ -91,7 +92,7 @@ impl Peer { #[derive(Debug)] enum AncestorSearch { Queued(u64), // queued to search for blocks starting from here. - Awaiting(ReqId, u64, request::Headers), // awaiting response for this request. + Awaiting(ReqId, u64, HeadersRequest), // awaiting response for this request. Prehistoric, // prehistoric block found. TODO: start to roll back CHTs. FoundCommon(u64, H256), // common block found. Genesis, // common ancestor is the genesis. @@ -113,7 +114,7 @@ impl AncestorSearch { match self { AncestorSearch::Awaiting(id, start, req) => { if &id == ctx.req_id() { - match response::decode_and_verify(ctx.data(), &req) { + match response::verify(ctx.data(), &req) { Ok(headers) => { for header in &headers { if client.is_known(&header.hash()) { @@ -150,17 +151,17 @@ impl AncestorSearch { } fn dispatch_request(self, mut dispatcher: F) -> AncestorSearch - where F: FnMut(request::Headers) -> Option + where F: FnMut(HeadersRequest) -> Option { - const BATCH_SIZE: usize = 64; + const BATCH_SIZE: u64 = 64; match self { AncestorSearch::Queued(start) => { - let batch_size = ::std::cmp::min(start as usize, BATCH_SIZE); + let batch_size = ::std::cmp::min(start, BATCH_SIZE); trace!(target: "sync", "Requesting {} reverse headers from {} to find common ancestor", batch_size, start); - let req = request::Headers { + let req = HeadersRequest { start: start.into(), max: batch_size, skip: 0, @@ -193,13 +194,13 @@ struct ResponseCtx<'a> { peer: PeerId, req_id: ReqId, ctx: &'a BasicContext, - data: &'a [Bytes], + data: &'a [encoded::Header], } impl<'a> ResponseContext for ResponseCtx<'a> { fn responder(&self) -> PeerId { self.peer } fn req_id(&self) -> &ReqId { &self.req_id } - fn data(&self) -> &[Bytes] { self.data } + fn data(&self) -> &[encoded::Header] { self.data } fn punish_responder(&self) { self.ctx.disable_peer(self.peer) } } @@ -313,11 +314,22 @@ impl Handler for LightSync { self.maintain_sync(ctx.as_basic()); } - fn on_block_headers(&self, ctx: &EventContext, req_id: ReqId, headers: &[Bytes]) { - if !self.peers.read().contains_key(&ctx.peer()) { + fn on_responses(&self, ctx: &EventContext, req_id: ReqId, responses: &[request::Response]) { + let peer = ctx.peer(); + if !self.peers.read().contains_key(&peer) { return } + let headers = match responses.get(0) { + Some(&request::Response::Headers(ref response)) => &response.headers[..], + Some(_) => { + trace!("Disabling peer {} for wrong response type.", peer); + ctx.disable_peer(peer); + &[] + } + None => &[], + }; + { let mut state = self.state.lock(); @@ -465,18 +477,27 @@ impl LightSync { // naive request dispatcher: just give to any peer which says it will // give us responses. - let dispatcher = move |req: request::Headers| { + let dispatcher = move |req: HeadersRequest| { rng.shuffle(&mut peer_ids); + let request = { + let mut builder = request::RequestBuilder::default(); + builder.push(request::Request::Headers(request::IncompleteHeadersRequest { + start: req.start.into(), + skip: req.skip, + max: req.max, + reverse: req.reverse, + })).expect("request provided fully complete with no unresolved back-references; qed"); + builder.build() + }; for peer in &peer_ids { - if ctx.max_requests(*peer, request::Kind::Headers) >= req.max { - match ctx.request_from(*peer, request::Request::Headers(req.clone())) { - Ok(id) => { - return Some(id) - } - Err(e) => - trace!(target: "sync", "Error requesting headers from viable peer: {}", e), + match ctx.request_from(*peer, request.clone()) { + Ok(id) => { + return Some(id) } + Err(NetError::NoCredits) => {} + Err(e) => + trace!(target: "sync", "Error requesting headers from viable peer: {}", e), } } diff --git a/sync/src/light_sync/response.rs b/sync/src/light_sync/response.rs index cb95824ce..d85d2548d 100644 --- a/sync/src/light_sync/response.rs +++ b/sync/src/light_sync/response.rs @@ -18,10 +18,11 @@ use std::fmt; +use ethcore::encoded; use ethcore::header::Header; -use light::request::{HashOrNumber, Headers as HeadersRequest}; -use rlp::{DecoderError, UntrustedRlp, View}; -use util::{Bytes, H256}; +use light::request::{HashOrNumber, CompleteHeadersRequest as HeadersRequest}; +use rlp::DecoderError; +use util::H256; /// Errors found when decoding headers and verifying with basic constraints. #[derive(Debug, PartialEq)] @@ -71,13 +72,13 @@ pub trait Constraint { fn verify(&self, headers: &[Header], reverse: bool) -> Result<(), Self::Error>; } -/// Decode a response and do basic verification against a request. -pub fn decode_and_verify(headers: &[Bytes], request: &HeadersRequest) -> Result, BasicError> { - let headers: Vec<_> = try!(headers.iter().map(|x| UntrustedRlp::new(&x).as_val()).collect()); +/// Do basic verification of provided headers against a request. +pub fn verify(headers: &[encoded::Header], request: &HeadersRequest) -> Result, BasicError> { + let headers: Vec<_> = headers.iter().map(|h| h.decode()).collect(); let reverse = request.reverse; - try!(Max(request.max).verify(&headers, reverse)); + try!(Max(request.max as usize).verify(&headers, reverse)); match request.start { HashOrNumber::Number(ref num) => try!(StartsAtNumber(*num).verify(&headers, reverse)), HashOrNumber::Hash(ref hash) => try!(StartsAtHash(*hash).verify(&headers, reverse)), @@ -150,8 +151,9 @@ impl Constraint for Max { #[cfg(test)] mod tests { + use ethcore::encoded; use ethcore::header::Header; - use light::request::Headers as HeadersRequest; + use light::request::CompleteHeadersRequest as HeadersRequest; use super::*; @@ -175,10 +177,10 @@ mod tests { parent_hash = Some(header.hash()); - ::rlp::encode(&header).to_vec() + encoded::Header::new(::rlp::encode(&header).to_vec()) }).collect(); - assert!(decode_and_verify(&headers, &request).is_ok()); + assert!(verify(&headers, &request).is_ok()); } #[test] @@ -201,10 +203,10 @@ mod tests { parent_hash = Some(header.hash()); - ::rlp::encode(&header).to_vec() + encoded::Header::new(::rlp::encode(&header).to_vec()) }).collect(); - assert!(decode_and_verify(&headers, &request).is_ok()); + assert!(verify(&headers, &request).is_ok()); } #[test] @@ -227,10 +229,10 @@ mod tests { parent_hash = Some(header.hash()); - ::rlp::encode(&header).to_vec() + encoded::Header::new(::rlp::encode(&header).to_vec()) }).collect(); - assert_eq!(decode_and_verify(&headers, &request), Err(BasicError::TooManyHeaders(20, 25))); + assert_eq!(verify(&headers, &request), Err(BasicError::TooManyHeaders(20, 25))); } #[test] @@ -246,9 +248,9 @@ mod tests { let mut header = Header::default(); header.set_number(x); - ::rlp::encode(&header).to_vec() + encoded::Header::new(::rlp::encode(&header).to_vec()) }).collect(); - assert_eq!(decode_and_verify(&headers, &request), Err(BasicError::WrongSkip(5, Some(2)))); + assert_eq!(verify(&headers, &request), Err(BasicError::WrongSkip(5, Some(2)))); } } diff --git a/sync/src/light_sync/sync_round.rs b/sync/src/light_sync/sync_round.rs index 6fa635214..dfa17aad4 100644 --- a/sync/src/light_sync/sync_round.rs +++ b/sync/src/light_sync/sync_round.rs @@ -20,13 +20,14 @@ use std::cmp::Ordering; use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque}; use std::fmt; +use ethcore::encoded; use ethcore::header::Header; use light::net::ReqId; -use light::request::Headers as HeadersRequest; +use light::request::CompleteHeadersRequest as HeadersRequest; use network::PeerId; -use util::{Bytes, H256}; +use util::H256; use super::response; @@ -40,7 +41,7 @@ pub trait ResponseContext { /// Get the request ID this response corresponds to. fn req_id(&self) -> &ReqId; /// Get the (unverified) response data. - fn data(&self) -> &[Bytes]; + fn data(&self) -> &[encoded::Header]; /// Punish the responder. fn punish_responder(&self); } @@ -114,7 +115,7 @@ impl Fetcher { let needed_headers = HeadersRequest { start: high_rung.parent_hash().clone().into(), - max: diff as usize - 1, + max: diff - 1, skip: 0, reverse: true, }; @@ -190,7 +191,7 @@ impl Fetcher { return SyncRound::Fetch(self); } - match response::decode_and_verify(headers, &request.headers_request) { + match response::verify(headers, &request.headers_request) { Err(e) => { trace!(target: "sync", "Punishing peer {} for invalid response ({})", ctx.responder(), e); ctx.punish_responder(); @@ -286,21 +287,21 @@ impl Fetcher { } // Compute scaffold parameters from non-zero distance between start and target block: (skip, pivots). -fn scaffold_params(diff: u64) -> (u64, usize) { +fn scaffold_params(diff: u64) -> (u64, u64) { // default parameters. // amount of blocks between each scaffold pivot. const ROUND_SKIP: u64 = 255; // amount of scaffold pivots: these are the Xs in "X___X___X" - const ROUND_PIVOTS: usize = 256; + const ROUND_PIVOTS: u64 = 256; let rem = diff % (ROUND_SKIP + 1); if diff <= ROUND_SKIP { // just request headers from the start to the target. - (0, rem as usize) + (0, rem) } else { // the number of pivots necessary to exactly hit or overshoot the target. let pivots_to_target = (diff / (ROUND_SKIP + 1)) + if rem == 0 { 0 } else { 1 }; - let num_pivots = ::std::cmp::min(pivots_to_target, ROUND_PIVOTS as u64) as usize; + let num_pivots = ::std::cmp::min(pivots_to_target, ROUND_PIVOTS); (ROUND_SKIP, num_pivots) } } @@ -319,7 +320,7 @@ pub struct RoundStart { contributors: HashSet, attempt: usize, skip: u64, - pivots: usize, + pivots: u64, } impl RoundStart { @@ -372,7 +373,7 @@ impl RoundStart { } }; - match response::decode_and_verify(ctx.data(), &req) { + match response::verify(ctx.data(), &req) { Ok(headers) => { if self.sparse_headers.len() == 0 && headers.get(0).map_or(false, |x| x.parent_hash() != &self.start_block.1) { @@ -383,7 +384,7 @@ impl RoundStart { self.contributors.insert(ctx.responder()); self.sparse_headers.extend(headers); - if self.sparse_headers.len() == self.pivots { + if self.sparse_headers.len() as u64 == self.pivots { return if self.skip == 0 { SyncRound::abort(AbortReason::TargetReached, self.sparse_headers.into()) } else { @@ -429,7 +430,7 @@ impl RoundStart { let start = (self.start_block.0 + 1) + self.sparse_headers.len() as u64 * (self.skip + 1); - let max = self.pivots - self.sparse_headers.len(); + let max = self.pivots - self.sparse_headers.len() as u64; let headers_request = HeadersRequest { start: start.into(), diff --git a/sync/src/light_sync/tests/test_net.rs b/sync/src/light_sync/tests/test_net.rs index d0e472374..898f8766d 100644 --- a/sync/src/light_sync/tests/test_net.rs +++ b/sync/src/light_sync/tests/test_net.rs @@ -28,6 +28,7 @@ use io::IoChannel; use light::client::Client as LightClient; use light::net::{LightProtocol, IoContext, Capabilities, Params as LightParams}; use light::net::request_credits::FlowParams; +use light::provider::LightProvider; use network::{NodeId, PeerId}; use util::RwLock; @@ -71,7 +72,7 @@ enum PeerData { } // test peer type. -// Either a full peer or a LES peer. +// Either a full peer or a light peer. pub struct Peer { proto: LightProtocol, queue: RwLock>, @@ -115,7 +116,8 @@ impl Peer { }, }; - let mut proto = LightProtocol::new(chain.clone(), params); + let provider = LightProvider::new(chain.clone(), Arc::new(RwLock::new(Default::default()))); + let mut proto = LightProtocol::new(Arc::new(provider), params); proto.add_handler(sync.clone()); Peer { proto: proto,