light: add LightChainClient trait

This commit is contained in:
Robert Habermeier 2016-12-14 23:26:15 +01:00
parent 1bcfc9348d
commit 0768a61944
2 changed files with 79 additions and 169 deletions

View File

@ -19,6 +19,7 @@
use ethcore::block_import_error::BlockImportError; use ethcore::block_import_error::BlockImportError;
use ethcore::block_status::BlockStatus; use ethcore::block_status::BlockStatus;
use ethcore::ids::BlockId; use ethcore::ids::BlockId;
use ethcore::header::Header;
use ethcore::verification::queue::{self, HeaderQueue}; use ethcore::verification::queue::{self, HeaderQueue};
use ethcore::transaction::SignedTransaction; use ethcore::transaction::SignedTransaction;
use ethcore::blockchain_info::BlockChainInfo; use ethcore::blockchain_info::BlockChainInfo;
@ -43,6 +44,28 @@ pub struct Config {
queue: queue::Config, queue: queue::Config,
} }
/// Trait for interacting with the header chain abstractly.
pub trait LightChainClient: Send + Sync {
/// Get chain info.
fn chain_info(&self) -> BlockChainInfo;
/// Queue header to be verified. Required that all headers queued have their
/// parent queued prior.
fn queue_header(&self, header: Header) -> Result<H256, BlockImportError>;
/// Query whether a block is known.
fn is_known(&self, hash: &H256) -> bool;
/// Clear the queue.
fn clear_queue(&self);
/// Get queue info.
fn queue_info(&self) -> queue::QueueInfo;
/// Get the `i`th CHT root.
fn cht_root(&self, i: usize) -> Option<H256>;
}
/// Light client implementation. /// Light client implementation.
pub struct Client { pub struct Client {
queue: HeaderQueue, queue: HeaderQueue,
@ -60,10 +83,8 @@ impl Client {
} }
} }
/// Import a header as rlp-encoded bytes. /// Import a header to the queue for additional verification.
pub fn import_header(&self, bytes: Bytes) -> Result<H256, BlockImportError> { pub fn import_header(&self, header: Header) -> Result<H256, BlockImportError> {
let header = ::rlp::decode(&bytes);
self.queue.import(header).map_err(Into::into) self.queue.import(header).map_err(Into::into)
} }
@ -120,6 +141,30 @@ impl Client {
} }
} }
impl LightChainClient for Client {
fn chain_info(&self) -> BlockChainInfo { Client::chain_info(self) }
fn queue_header(&self, header: Header) -> Result<H256, BlockImportError> {
self.import_header(header)
}
fn is_known(&self, hash: &H256) -> bool {
self.status(hash) == BlockStatus::InChain
}
fn clear_queue(&self) {
self.queue.clear()
}
fn queue_info(&self) -> queue::QueueInfo {
self.queue.queue_info()
}
fn cht_root(&self, i: usize) -> Option<H256> {
Client::cht_root(self, i)
}
}
// dummy implementation -- may draw from canonical cache further on. // dummy implementation -- may draw from canonical cache further on.
impl Provider for Client { impl Provider for Client {
fn chain_info(&self) -> BlockChainInfo { fn chain_info(&self) -> BlockChainInfo {

View File

@ -22,7 +22,7 @@
//! in groups. //! in groups.
//! //!
//! This is written assuming that the client and sync service are running //! This is written assuming that the client and sync service are running
//! in the same binary; unlike a full node //! in the same binary; unlike a full node which might communicate via IPC.
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt; use std::fmt;
@ -30,27 +30,22 @@ use std::sync::Arc;
use ethcore::header::Header; use ethcore::header::Header;
use light::client::Client; use light::client::LightChainClient;
use light::net::{Announcement, Error as NetError, Handler, EventContext, Capabilities, ReqId, Status}; use light::net::{Announcement, Error as NetError, Handler, EventContext, Capabilities, ReqId, Status};
use light::request; use light::request;
use network::PeerId; use network::PeerId;
use rlp::{UntrustedRlp, View}; use rlp::{DecoderError, UntrustedRlp, View};
use util::{Bytes, U256, H256, Mutex, RwLock}; use util::{Bytes, U256, H256, Mutex, RwLock};
// How many headers we request at a time when searching for best mod response;
// common ancestor with peer. mod sync_round;
const UNCONFIRMED_SEARCH_SIZE: u64 = 128;
#[derive(Debug)] #[derive(Debug)]
enum Error { enum Error {
// Peer is useless for now.
UselessPeer,
// Peer returned a malformed response. // Peer returned a malformed response.
MalformedResponse, MalformedResponse(response::BasicError),
// Peer returned known bad block. // Peer returned known bad block.
BadBlock, BadBlock,
// Peer had a prehistoric common ancestor.
PrehistoricAncestor,
// Protocol-level error. // Protocol-level error.
ProtocolLevel(NetError), ProtocolLevel(NetError),
} }
@ -61,13 +56,17 @@ impl From<NetError> for Error {
} }
} }
impl From<response::BasicError> for Error {
fn from(err: response::BasicError) -> Self {
Error::MalformedResponse(err)
}
}
impl fmt::Display for Error { impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self { match *self {
Error::UselessPeer => write!(f, "Peer is useless"), Error::MalformedResponse(ref err) => write!(f, "{}", err),
Error::MalformedResponse => write!(f, "Response malformed"),
Error::BadBlock => write!(f, "Block known to be bad"), Error::BadBlock => write!(f, "Block known to be bad"),
Error::PrehistoricAncestor => write!(f, "Common ancestor is prehistoric"),
Error::ProtocolLevel(ref err) => write!(f, "Protocol level error: {}", err), Error::ProtocolLevel(ref err) => write!(f, "Protocol level error: {}", err),
} }
} }
@ -81,106 +80,29 @@ struct ChainInfo {
head_num: u64, head_num: u64,
} }
/// A peer we haven't found a common ancestor for yet. struct Peer {
struct UnconfirmedPeer { first_status: ChainInfo,
chain_info: ChainInfo, status: ChainInfo,
last_batched: u64,
req_id: ReqId,
} }
impl UnconfirmedPeer { impl Peer {
/// Create an unconfirmed peer. Returns `None` if we cannot make a /// Create a peer object.
/// common ancestors request for some reason. The event context provided fn new(chain_info: ChainInfo) -> Self {
/// should be associated with this peer. Peer {
fn create(ctx: &EventContext, chain_info: ChainInfo, best_num: u64) -> Result<Self, Error> { first_status: chain_info.clone(),
let this = ctx.peer(); status: chain_info.clone(),
if ctx.max_requests(this, request::Kind::Headers) < UNCONFIRMED_SEARCH_SIZE as usize {
return Err(Error::UselessPeer); // a peer which allows this few header reqs isn't useful anyway.
}
let req_id = try!(ctx.request_from(this, request::Request::Headers(request::Headers {
start: best_num.into(),
max: ::std::cmp::min(best_num, UNCONFIRMED_SEARCH_SIZE) as usize,
skip: 0,
reverse: true,
})));
Ok(UnconfirmedPeer {
chain_info: chain_info,
last_batched: best_num,
req_id: req_id,
})
}
/// Feed in the result of the headers query. If an error occurs, the request
/// is malformed. If a common (hash, number) pair is returned then this is
/// the common ancestor. If not, then another request for headers has been
/// dispatched.
fn check_batch(&mut self, ctx: &EventContext, client: &Client, headers: &[Bytes]) -> Result<Option<H256>, Error> {
use ethcore::block_status::BlockStatus;
let mut cur_num = self.last_batched;
let chain_info = client.chain_info();
for raw_header in headers {
let header: Header = try!(UntrustedRlp::new(&raw_header).as_val().map_err(|_| Error::MalformedResponse));
if header.number() != cur_num { return Err(Error::MalformedResponse) }
if chain_info.first_block_number.map_or(false, |f| header.number() < f) {
return Err(Error::PrehistoricAncestor);
}
let hash = header.hash();
match client.status(&hash) {
BlockStatus::InChain => return Ok(Some(hash)),
BlockStatus::Bad => return Err(Error::BadBlock),
BlockStatus::Unknown | BlockStatus::Queued => {},
}
cur_num -= 1;
}
let this = ctx.peer();
if cur_num == 0 {
trace!(target: "sync", "Peer {}: genesis as common ancestor", this);
return Ok(Some(chain_info.genesis_hash));
}
// nothing found, nothing prehistoric.
// send the next request.
let req_id = try!(ctx.request_from(this, request::Request::Headers(request::Headers {
start: cur_num.into(),
max: ::std::cmp::min(cur_num, UNCONFIRMED_SEARCH_SIZE) as usize,
skip: 0,
reverse: true,
})));
self.req_id = req_id;
Ok(None)
} }
} }
/// Connected peers as state machines.
///
/// On connection, we'll search for a common ancestor to their chain.
/// Once that's found, we can sync to this peer.
enum Peer {
// Searching for a common ancestor.
SearchCommon(UnconfirmedPeer),
// A peer we can sync to.
SyncTo(ChainInfo),
} }
/// Light client synchronization manager. See module docs for more details. /// Light client synchronization manager. See module docs for more details.
pub struct LightSync { pub struct LightSync<L: LightChainClient> {
best_seen: Mutex<Option<(H256, U256)>>, // best seen block on the network. best_seen: Mutex<Option<(H256, U256)>>, // best seen block on the network.
peers: RwLock<HashMap<PeerId, Mutex<Peer>>>, // peers which are relevant to synchronization. peers: RwLock<HashMap<PeerId, Mutex<Peer>>>, // peers which are relevant to synchronization.
client: Arc<Client>, client: Arc<L>,
} }
impl Handler for LightSync { impl<L: LightChainClient> Handler for LightSync<L> {
fn on_connect(&self, ctx: &EventContext, status: &Status, capabilities: &Capabilities) { fn on_connect(&self, ctx: &EventContext, status: &Status, capabilities: &Capabilities) {
let our_best = self.client.chain_info().best_block_number; let our_best = self.client.chain_info().best_block_number;
@ -195,34 +117,12 @@ impl Handler for LightSync {
head_num: status.head_num, head_num: status.head_num,
}; };
trace!(target: "sync", "Beginning search for common ancestor with peer {}", ctx.peer()); self.peers.write().insert(ctx.peer(), Mutex::new(Peer::new(chain_info)));
let unconfirmed = match UnconfirmedPeer::create(ctx, chain_info, our_best) {
Ok(unconfirmed) => unconfirmed,
Err(e) => {
trace!(target: "sync", "Failed to create unconfirmed peer: {}", e);
return;
}
};
self.peers.write().insert(ctx.peer(), Mutex::new(Peer::SearchCommon(unconfirmed)));
} }
fn on_disconnect(&self, ctx: &EventContext, _unfulfilled: &[ReqId]) { fn on_disconnect(&self, ctx: &EventContext, _unfulfilled: &[ReqId]) {
let peer = ctx.peer(); let peer_id = ctx.peer();
match self.peers.write().remove(&peer).map(|peer_data| peer_data.into_inner()) {
None => {}
Some(Peer::SearchCommon(_)) => {
// unfulfilled requests are unimportant since they are only
// relevant to searching for a common ancestor.
trace!(target: "sync", "Unconfirmed peer {} disconnect", ctx.peer());
}
Some(Peer::SyncTo(_)) => {
trace!(target: "sync", "")
// in this case we may want to reasssign all unfulfilled requests.
// (probably just by pushing them back into the current downloader's priority queue.)
}
}
} }
fn on_announcement(&self, ctx: &EventContext, announcement: &Announcement) { fn on_announcement(&self, ctx: &EventContext, announcement: &Announcement) {
@ -234,55 +134,20 @@ impl Handler for LightSync {
trace!(target: "sync", "Announcement from peer {}: new chain head {:?}, reorg depth {}", trace!(target: "sync", "Announcement from peer {}: new chain head {:?}, reorg depth {}",
ctx.peer(), (announcement.head_hash, announcement.head_num), announcement.reorg_depth); ctx.peer(), (announcement.head_hash, announcement.head_num), announcement.reorg_depth);
} }
fn on_block_headers(&self, ctx: &EventContext, req_id: ReqId, headers: &[Bytes]) { fn on_block_headers(&self, ctx: &EventContext, req_id: ReqId, headers: &[Bytes]) {
let peer = ctx.peer(); let peer_id = ctx.peer();
match self.peers.read().get(&peer) {
None => {},
Some(peer_data) => {
let mut peer_data = peer_data.lock();
let new_peer = match *peer_data {
Peer::SearchCommon(ref mut unconfirmed) => {
if unconfirmed.req_id != req_id {
trace!(target: "sync", "Ignoring irrelevant response from peer {}", peer);
return;
}
match unconfirmed.check_batch(ctx, &self.client, headers) {
Ok(None) => {
trace!(target: "sync", "Continuing to search for common ancestor with peer {}", peer);
return;
}
Ok(Some(common)) => {
trace!(target: "sync", "Found common ancestor {} with peer {}", peer, common);
let chain_info = unconfirmed.chain_info.clone();
Peer::SyncTo(chain_info)
}
Err(e) => {
trace!(target: "sync", "Failed to find common ancestor with peer {}: {}", peer, e);
return;
}
}
}
Peer::SyncTo(_) => {
trace!(target: "sync", "Incoming response from peer being synced to.");
},
};
*peer_data = new_peer;
}
}
} }
} }
// public API // public API
impl LightSync { impl<L: LightChainClient> LightSync<L> {
/// Create a new instance of `LightSync`. /// Create a new instance of `LightSync`.
/// ///
/// This won't do anything until registered as a handler /// This won't do anything until registered as a handler
/// so it can act on events. /// so it can act on events.
pub fn new(client: Arc<Client>) -> Self { pub fn new(client: Arc<L>) -> Self {
LightSync { LightSync {
best_seen: Mutex::new(None), best_seen: Mutex::new(None),
peers: RwLock::new(HashMap::new()), peers: RwLock::new(HashMap::new()),