port ethsync to PIP messages

This commit is contained in:
Robert Habermeier 2017-03-16 23:51:47 +01:00
parent b5527415d6
commit fa42b6acec
4 changed files with 79 additions and 53 deletions

View File

@ -16,7 +16,7 @@
//! Light client synchronization. //! Light client synchronization.
//! //!
//! This will synchronize the header chain using LES messages. //! This will synchronize the header chain using PIP messages.
//! Dataflow is largely one-directional as headers are pushed into //! Dataflow is largely one-directional as headers are pushed into
//! the light client queue for import. Where possible, they are batched //! the light client queue for import. Where possible, they are batched
//! in groups. //! in groups.
@ -36,14 +36,15 @@ use std::collections::HashMap;
use std::mem; use std::mem;
use std::sync::Arc; use std::sync::Arc;
use ethcore::encoded;
use light::client::{AsLightClient, LightChainClient}; use light::client::{AsLightClient, LightChainClient};
use light::net::{ use light::net::{
Announcement, Handler, BasicContext, EventContext, Announcement, Handler, BasicContext, EventContext,
Capabilities, ReqId, Status, Capabilities, ReqId, Status, Error as NetError,
}; };
use light::request; use light::request::{self, CompleteHeadersRequest as HeadersRequest};
use network::PeerId; use network::PeerId;
use util::{Bytes, U256, H256, Mutex, RwLock}; use util::{U256, H256, Mutex, RwLock};
use rand::{Rng, OsRng}; use rand::{Rng, OsRng};
use self::sync_round::{AbortReason, SyncRound, ResponseContext}; use self::sync_round::{AbortReason, SyncRound, ResponseContext};
@ -91,7 +92,7 @@ impl Peer {
#[derive(Debug)] #[derive(Debug)]
enum AncestorSearch { enum AncestorSearch {
Queued(u64), // queued to search for blocks starting from here. Queued(u64), // queued to search for blocks starting from here.
Awaiting(ReqId, u64, request::Headers), // awaiting response for this request. Awaiting(ReqId, u64, HeadersRequest), // awaiting response for this request.
Prehistoric, // prehistoric block found. TODO: start to roll back CHTs. Prehistoric, // prehistoric block found. TODO: start to roll back CHTs.
FoundCommon(u64, H256), // common block found. FoundCommon(u64, H256), // common block found.
Genesis, // common ancestor is the genesis. Genesis, // common ancestor is the genesis.
@ -113,7 +114,7 @@ impl AncestorSearch {
match self { match self {
AncestorSearch::Awaiting(id, start, req) => { AncestorSearch::Awaiting(id, start, req) => {
if &id == ctx.req_id() { if &id == ctx.req_id() {
match response::decode_and_verify(ctx.data(), &req) { match response::verify(ctx.data(), &req) {
Ok(headers) => { Ok(headers) => {
for header in &headers { for header in &headers {
if client.is_known(&header.hash()) { if client.is_known(&header.hash()) {
@ -150,17 +151,17 @@ impl AncestorSearch {
} }
fn dispatch_request<F>(self, mut dispatcher: F) -> AncestorSearch fn dispatch_request<F>(self, mut dispatcher: F) -> AncestorSearch
where F: FnMut(request::Headers) -> Option<ReqId> where F: FnMut(HeadersRequest) -> Option<ReqId>
{ {
const BATCH_SIZE: usize = 64; const BATCH_SIZE: u64 = 64;
match self { match self {
AncestorSearch::Queued(start) => { AncestorSearch::Queued(start) => {
let batch_size = ::std::cmp::min(start as usize, BATCH_SIZE); let batch_size = ::std::cmp::min(start, BATCH_SIZE);
trace!(target: "sync", "Requesting {} reverse headers from {} to find common ancestor", trace!(target: "sync", "Requesting {} reverse headers from {} to find common ancestor",
batch_size, start); batch_size, start);
let req = request::Headers { let req = HeadersRequest {
start: start.into(), start: start.into(),
max: batch_size, max: batch_size,
skip: 0, skip: 0,
@ -193,13 +194,13 @@ struct ResponseCtx<'a> {
peer: PeerId, peer: PeerId,
req_id: ReqId, req_id: ReqId,
ctx: &'a BasicContext, ctx: &'a BasicContext,
data: &'a [Bytes], data: &'a [encoded::Header],
} }
impl<'a> ResponseContext for ResponseCtx<'a> { impl<'a> ResponseContext for ResponseCtx<'a> {
fn responder(&self) -> PeerId { self.peer } fn responder(&self) -> PeerId { self.peer }
fn req_id(&self) -> &ReqId { &self.req_id } fn req_id(&self) -> &ReqId { &self.req_id }
fn data(&self) -> &[Bytes] { self.data } fn data(&self) -> &[encoded::Header] { self.data }
fn punish_responder(&self) { self.ctx.disable_peer(self.peer) } fn punish_responder(&self) { self.ctx.disable_peer(self.peer) }
} }
@ -313,11 +314,22 @@ impl<L: AsLightClient + Send + Sync> Handler for LightSync<L> {
self.maintain_sync(ctx.as_basic()); self.maintain_sync(ctx.as_basic());
} }
fn on_block_headers(&self, ctx: &EventContext, req_id: ReqId, headers: &[Bytes]) { fn on_responses(&self, ctx: &EventContext, req_id: ReqId, responses: &[request::Response]) {
if !self.peers.read().contains_key(&ctx.peer()) { let peer = ctx.peer();
if !self.peers.read().contains_key(&peer) {
return return
} }
let headers = match responses.get(0) {
Some(&request::Response::Headers(ref response)) => &response.headers[..],
Some(_) => {
trace!("Disabling peer {} for wrong response type.", peer);
ctx.disable_peer(peer);
&[]
}
None => &[],
};
{ {
let mut state = self.state.lock(); let mut state = self.state.lock();
@ -465,18 +477,27 @@ impl<L: AsLightClient> LightSync<L> {
// naive request dispatcher: just give to any peer which says it will // naive request dispatcher: just give to any peer which says it will
// give us responses. // give us responses.
let dispatcher = move |req: request::Headers| { let dispatcher = move |req: HeadersRequest| {
rng.shuffle(&mut peer_ids); rng.shuffle(&mut peer_ids);
let request = {
let mut builder = request::RequestBuilder::default();
builder.push(request::Request::Headers(request::IncompleteHeadersRequest {
start: req.start.into(),
skip: req.skip,
max: req.max,
reverse: req.reverse,
})).expect("request provided fully complete with no unresolved back-references; qed");
builder.build()
};
for peer in &peer_ids { for peer in &peer_ids {
if ctx.max_requests(*peer, request::Kind::Headers) >= req.max { match ctx.request_from(*peer, request.clone()) {
match ctx.request_from(*peer, request::Request::Headers(req.clone())) { Ok(id) => {
Ok(id) => { return Some(id)
return Some(id)
}
Err(e) =>
trace!(target: "sync", "Error requesting headers from viable peer: {}", e),
} }
Err(NetError::NoCredits) => {}
Err(e) =>
trace!(target: "sync", "Error requesting headers from viable peer: {}", e),
} }
} }

View File

@ -18,10 +18,11 @@
use std::fmt; use std::fmt;
use ethcore::encoded;
use ethcore::header::Header; use ethcore::header::Header;
use light::request::{HashOrNumber, Headers as HeadersRequest}; use light::request::{HashOrNumber, CompleteHeadersRequest as HeadersRequest};
use rlp::{DecoderError, UntrustedRlp, View}; use rlp::DecoderError;
use util::{Bytes, H256}; use util::H256;
/// Errors found when decoding headers and verifying with basic constraints. /// Errors found when decoding headers and verifying with basic constraints.
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
@ -71,13 +72,13 @@ pub trait Constraint {
fn verify(&self, headers: &[Header], reverse: bool) -> Result<(), Self::Error>; fn verify(&self, headers: &[Header], reverse: bool) -> Result<(), Self::Error>;
} }
/// Decode a response and do basic verification against a request. /// Do basic verification of provided headers against a request.
pub fn decode_and_verify(headers: &[Bytes], request: &HeadersRequest) -> Result<Vec<Header>, BasicError> { pub fn verify(headers: &[encoded::Header], request: &HeadersRequest) -> Result<Vec<Header>, BasicError> {
let headers: Vec<_> = try!(headers.iter().map(|x| UntrustedRlp::new(&x).as_val()).collect()); let headers: Vec<_> = headers.iter().map(|h| h.decode()).collect();
let reverse = request.reverse; let reverse = request.reverse;
try!(Max(request.max).verify(&headers, reverse)); try!(Max(request.max as usize).verify(&headers, reverse));
match request.start { match request.start {
HashOrNumber::Number(ref num) => try!(StartsAtNumber(*num).verify(&headers, reverse)), HashOrNumber::Number(ref num) => try!(StartsAtNumber(*num).verify(&headers, reverse)),
HashOrNumber::Hash(ref hash) => try!(StartsAtHash(*hash).verify(&headers, reverse)), HashOrNumber::Hash(ref hash) => try!(StartsAtHash(*hash).verify(&headers, reverse)),
@ -150,8 +151,9 @@ impl Constraint for Max {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use ethcore::encoded;
use ethcore::header::Header; use ethcore::header::Header;
use light::request::Headers as HeadersRequest; use light::request::CompleteHeadersRequest as HeadersRequest;
use super::*; use super::*;
@ -175,10 +177,10 @@ mod tests {
parent_hash = Some(header.hash()); parent_hash = Some(header.hash());
::rlp::encode(&header).to_vec() encoded::Header::new(::rlp::encode(&header).to_vec())
}).collect(); }).collect();
assert!(decode_and_verify(&headers, &request).is_ok()); assert!(verify(&headers, &request).is_ok());
} }
#[test] #[test]
@ -201,10 +203,10 @@ mod tests {
parent_hash = Some(header.hash()); parent_hash = Some(header.hash());
::rlp::encode(&header).to_vec() encoded::Header::new(::rlp::encode(&header).to_vec())
}).collect(); }).collect();
assert!(decode_and_verify(&headers, &request).is_ok()); assert!(verify(&headers, &request).is_ok());
} }
#[test] #[test]
@ -227,10 +229,10 @@ mod tests {
parent_hash = Some(header.hash()); parent_hash = Some(header.hash());
::rlp::encode(&header).to_vec() encoded::Header::new(::rlp::encode(&header).to_vec())
}).collect(); }).collect();
assert_eq!(decode_and_verify(&headers, &request), Err(BasicError::TooManyHeaders(20, 25))); assert_eq!(verify(&headers, &request), Err(BasicError::TooManyHeaders(20, 25)));
} }
#[test] #[test]
@ -246,9 +248,9 @@ mod tests {
let mut header = Header::default(); let mut header = Header::default();
header.set_number(x); header.set_number(x);
::rlp::encode(&header).to_vec() encoded::Header::new(::rlp::encode(&header).to_vec())
}).collect(); }).collect();
assert_eq!(decode_and_verify(&headers, &request), Err(BasicError::WrongSkip(5, Some(2)))); assert_eq!(verify(&headers, &request), Err(BasicError::WrongSkip(5, Some(2))));
} }
} }

View File

@ -20,13 +20,14 @@ use std::cmp::Ordering;
use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque}; use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
use std::fmt; use std::fmt;
use ethcore::encoded;
use ethcore::header::Header; use ethcore::header::Header;
use light::net::ReqId; use light::net::ReqId;
use light::request::Headers as HeadersRequest; use light::request::CompleteHeadersRequest as HeadersRequest;
use network::PeerId; use network::PeerId;
use util::{Bytes, H256}; use util::H256;
use super::response; use super::response;
@ -40,7 +41,7 @@ pub trait ResponseContext {
/// Get the request ID this response corresponds to. /// Get the request ID this response corresponds to.
fn req_id(&self) -> &ReqId; fn req_id(&self) -> &ReqId;
/// Get the (unverified) response data. /// Get the (unverified) response data.
fn data(&self) -> &[Bytes]; fn data(&self) -> &[encoded::Header];
/// Punish the responder. /// Punish the responder.
fn punish_responder(&self); fn punish_responder(&self);
} }
@ -114,7 +115,7 @@ impl Fetcher {
let needed_headers = HeadersRequest { let needed_headers = HeadersRequest {
start: high_rung.parent_hash().clone().into(), start: high_rung.parent_hash().clone().into(),
max: diff as usize - 1, max: diff - 1,
skip: 0, skip: 0,
reverse: true, reverse: true,
}; };
@ -190,7 +191,7 @@ impl Fetcher {
return SyncRound::Fetch(self); return SyncRound::Fetch(self);
} }
match response::decode_and_verify(headers, &request.headers_request) { match response::verify(headers, &request.headers_request) {
Err(e) => { Err(e) => {
trace!(target: "sync", "Punishing peer {} for invalid response ({})", ctx.responder(), e); trace!(target: "sync", "Punishing peer {} for invalid response ({})", ctx.responder(), e);
ctx.punish_responder(); ctx.punish_responder();
@ -286,21 +287,21 @@ impl Fetcher {
} }
// Compute scaffold parameters from non-zero distance between start and target block: (skip, pivots). // Compute scaffold parameters from non-zero distance between start and target block: (skip, pivots).
fn scaffold_params(diff: u64) -> (u64, usize) { fn scaffold_params(diff: u64) -> (u64, u64) {
// default parameters. // default parameters.
// amount of blocks between each scaffold pivot. // amount of blocks between each scaffold pivot.
const ROUND_SKIP: u64 = 255; const ROUND_SKIP: u64 = 255;
// amount of scaffold pivots: these are the Xs in "X___X___X" // amount of scaffold pivots: these are the Xs in "X___X___X"
const ROUND_PIVOTS: usize = 256; const ROUND_PIVOTS: u64 = 256;
let rem = diff % (ROUND_SKIP + 1); let rem = diff % (ROUND_SKIP + 1);
if diff <= ROUND_SKIP { if diff <= ROUND_SKIP {
// just request headers from the start to the target. // just request headers from the start to the target.
(0, rem as usize) (0, rem)
} else { } else {
// the number of pivots necessary to exactly hit or overshoot the target. // the number of pivots necessary to exactly hit or overshoot the target.
let pivots_to_target = (diff / (ROUND_SKIP + 1)) + if rem == 0 { 0 } else { 1 }; let pivots_to_target = (diff / (ROUND_SKIP + 1)) + if rem == 0 { 0 } else { 1 };
let num_pivots = ::std::cmp::min(pivots_to_target, ROUND_PIVOTS as u64) as usize; let num_pivots = ::std::cmp::min(pivots_to_target, ROUND_PIVOTS);
(ROUND_SKIP, num_pivots) (ROUND_SKIP, num_pivots)
} }
} }
@ -319,7 +320,7 @@ pub struct RoundStart {
contributors: HashSet<PeerId>, contributors: HashSet<PeerId>,
attempt: usize, attempt: usize,
skip: u64, skip: u64,
pivots: usize, pivots: u64,
} }
impl RoundStart { impl RoundStart {
@ -372,7 +373,7 @@ impl RoundStart {
} }
}; };
match response::decode_and_verify(ctx.data(), &req) { match response::verify(ctx.data(), &req) {
Ok(headers) => { Ok(headers) => {
if self.sparse_headers.len() == 0 if self.sparse_headers.len() == 0
&& headers.get(0).map_or(false, |x| x.parent_hash() != &self.start_block.1) { && headers.get(0).map_or(false, |x| x.parent_hash() != &self.start_block.1) {
@ -383,7 +384,7 @@ impl RoundStart {
self.contributors.insert(ctx.responder()); self.contributors.insert(ctx.responder());
self.sparse_headers.extend(headers); self.sparse_headers.extend(headers);
if self.sparse_headers.len() == self.pivots { if self.sparse_headers.len() as u64 == self.pivots {
return if self.skip == 0 { return if self.skip == 0 {
SyncRound::abort(AbortReason::TargetReached, self.sparse_headers.into()) SyncRound::abort(AbortReason::TargetReached, self.sparse_headers.into())
} else { } else {
@ -429,7 +430,7 @@ impl RoundStart {
let start = (self.start_block.0 + 1) let start = (self.start_block.0 + 1)
+ self.sparse_headers.len() as u64 * (self.skip + 1); + self.sparse_headers.len() as u64 * (self.skip + 1);
let max = self.pivots - self.sparse_headers.len(); let max = self.pivots - self.sparse_headers.len() as u64;
let headers_request = HeadersRequest { let headers_request = HeadersRequest {
start: start.into(), start: start.into(),

View File

@ -28,6 +28,7 @@ use io::IoChannel;
use light::client::Client as LightClient; use light::client::Client as LightClient;
use light::net::{LightProtocol, IoContext, Capabilities, Params as LightParams}; use light::net::{LightProtocol, IoContext, Capabilities, Params as LightParams};
use light::net::request_credits::FlowParams; use light::net::request_credits::FlowParams;
use light::provider::LightProvider;
use network::{NodeId, PeerId}; use network::{NodeId, PeerId};
use util::RwLock; use util::RwLock;
@ -71,7 +72,7 @@ enum PeerData {
} }
// test peer type. // test peer type.
// Either a full peer or a LES peer. // Either a full peer or a light peer.
pub struct Peer { pub struct Peer {
proto: LightProtocol, proto: LightProtocol,
queue: RwLock<VecDeque<TestPacket>>, queue: RwLock<VecDeque<TestPacket>>,
@ -115,7 +116,8 @@ impl Peer {
}, },
}; };
let mut proto = LightProtocol::new(chain.clone(), params); let provider = LightProvider::new(chain.clone(), Arc::new(RwLock::new(Default::default())));
let mut proto = LightProtocol::new(Arc::new(provider), params);
proto.add_handler(sync.clone()); proto.add_handler(sync.clone());
Peer { Peer {
proto: proto, proto: proto,