diff --git a/ethcore/light/src/net/context.rs b/ethcore/light/src/net/context.rs index 74d8ad811..e95434a3b 100644 --- a/ethcore/light/src/net/context.rs +++ b/ethcore/light/src/net/context.rs @@ -105,6 +105,9 @@ pub trait EventContext: BasicContext { /// Get the peer relevant to the event e.g. message sender, /// disconnected/connected peer. fn peer(&self) -> PeerId; + + /// Treat the event context as a basic context. + fn as_basic(&self) -> &BasicContext; } /// Basic context. @@ -182,4 +185,8 @@ impl<'a> EventContext for Ctx<'a> { fn peer(&self) -> PeerId { self.peer } + + fn as_basic(&self) -> &BasicContext { + &*self + } } diff --git a/sync/src/lib.rs b/sync/src/lib.rs index fc9e5de74..504f4e97c 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -54,11 +54,12 @@ extern crate ethcore_ipc as ipc; mod chain; mod blocks; mod block_sync; -mod light_sync; mod sync_io; mod snapshot; mod transactions_stats; +pub mod light_sync; + #[cfg(test)] mod tests; diff --git a/sync/src/light_sync/mod.rs b/sync/src/light_sync/mod.rs index 243afa6f7..12bb6a11e 100644 --- a/sync/src/light_sync/mod.rs +++ b/sync/src/light_sync/mod.rs @@ -25,18 +25,22 @@ //! in the same binary; unlike a full node which might communicate via IPC. use std::collections::HashMap; -use std::fmt; +use std::mem; use std::sync::Arc; use ethcore::header::Header; use light::client::LightChainClient; -use light::net::{Announcement, Error as NetError, Handler, EventContext, Capabilities, ReqId, Status}; +use light::net::{ + Announcement, Handler, BasicContext, EventContext, + Capabilities, ReqId, Status +}; use light::request; use network::PeerId; -use rlp::{DecoderError, UntrustedRlp, View}; use util::{Bytes, U256, H256, Mutex, RwLock}; +use self::sync_round::{SyncRound, ResponseContext}; + mod response; mod sync_round; @@ -49,7 +53,6 @@ struct ChainInfo { } struct Peer { - first_status: ChainInfo, status: ChainInfo, } @@ -57,17 +60,48 @@ impl Peer { /// Create a peer object. fn new(chain_info: ChainInfo) -> Self { Peer { - first_status: chain_info.clone(), status: chain_info.clone(), } } } +// Search for a common ancestor. +struct AncestorSearch { + last_batched: u64, + req_id: ReqId, +} + +// synchronization state machine. +enum SyncState { + // Idle (waiting for peers) + Idle, + // searching for common ancestor with best chain. + // queue should be cleared at this phase. + AncestorSearch(AncestorSearch), + // Doing sync rounds. + Rounds(SyncRound), +} + +struct ResponseCtx<'a> { + peer: PeerId, + req_id: ReqId, + ctx: &'a BasicContext, + data: &'a [Bytes], +} + +impl<'a> ResponseContext for ResponseCtx<'a> { + fn responder(&self) -> PeerId { self.peer } + fn req_id(&self) -> &ReqId { &self.req_id } + fn data(&self) -> &[Bytes] { self.data } + fn punish_responder(&self) { self.ctx.disable_peer(self.peer) } +} + /// Light client synchronization manager. See module docs for more details. pub struct LightSync { best_seen: Mutex>, // best seen block on the network. peers: RwLock>>, // peers which are relevant to synchronization. client: Arc, + state: Mutex, } impl Handler for LightSync { @@ -75,7 +109,8 @@ impl Handler for LightSync { let our_best = self.client.chain_info().best_block_number; if !capabilities.serve_headers || status.head_num <= our_best { - trace!(target: "sync", "Ignoring irrelevant peer: {}", ctx.peer()); + trace!(target: "sync", "Disconnecting irrelevant peer: {}", ctx.peer()); + ctx.disconnect_peer(ctx.peer()); return; } @@ -85,27 +120,130 @@ impl Handler for LightSync { head_num: status.head_num, }; + let mut best = self.best_seen.lock(); + if best.as_ref().map_or(true, |b| status.head_td > b.1) { + *best = Some((status.head_hash, status.head_td)); + } + self.peers.write().insert(ctx.peer(), Mutex::new(Peer::new(chain_info))); + self.maintain_sync(ctx.as_basic()); } - fn on_disconnect(&self, ctx: &EventContext, _unfulfilled: &[ReqId]) { + fn on_disconnect(&self, ctx: &EventContext, unfulfilled: &[ReqId]) { let peer_id = ctx.peer(); + let peer = match self.peers.write().remove(&peer_id).map(|p| p.into_inner()) { + Some(peer) => peer, + None => return, + }; + + let new_best = { + let mut best = self.best_seen.lock(); + let peer_best = (peer.status.head_hash, peer.status.head_td); + + if best.as_ref().map_or(false, |b| b == &peer_best) { + // search for next-best block. + let next_best: Option<(H256, U256)> = self.peers.read().values() + .map(|p| p.lock()) + .map(|p| (p.status.head_hash, p.status.head_td)) + .fold(None, |acc, x| match acc { + Some(acc) => if x.1 > acc.1 { Some(x) } else { Some(acc) }, + None => Some(x), + }); + + *best = next_best; + } + + best.clone() + }; + + if new_best.is_none() { + debug!(target: "sync", "No peers remain. Reverting to idle"); + *self.state.lock() = SyncState::Idle; + } else { + let mut state = self.state.lock(); + + *state = match mem::replace(&mut *state, SyncState::Idle) { + SyncState::Idle => SyncState::Idle, + SyncState::AncestorSearch(search) => SyncState::AncestorSearch(search), + SyncState::Rounds(round) => SyncState::Rounds(round.requests_abandoned(unfulfilled)), + }; + } + + self.maintain_sync(ctx.as_basic()); } fn on_announcement(&self, ctx: &EventContext, announcement: &Announcement) { - // restart search for common ancestor if necessary. - // restart download if necessary. - // if this is a peer we found irrelevant earlier, we may want to - // re-evaluate their usefulness. - if !self.peers.read().contains_key(&ctx.peer()) { return } + let last_td = { + let peers = self.peers.read(); + match peers.get(&ctx.peer()){ + None => return, + Some(peer) => { + let mut peer = peer.lock(); + let last_td = peer.status.head_td; + peer.status = ChainInfo { + head_td: announcement.head_td, + head_hash: announcement.head_hash, + head_num: announcement.head_num, + }; + last_td + } + } + }; trace!(target: "sync", "Announcement from peer {}: new chain head {:?}, reorg depth {}", ctx.peer(), (announcement.head_hash, announcement.head_num), announcement.reorg_depth); + + if last_td < announcement.head_td { + trace!(target: "sync", "Peer {} moved backwards.", ctx.peer()); + self.peers.write().remove(&ctx.peer()); + ctx.disconnect_peer(ctx.peer()); + } + + let mut best = self.best_seen.lock(); + if best.as_ref().map_or(true, |b| announcement.head_td > b.1) { + *best = Some((announcement.head_hash, announcement.head_td)); + } + + self.maintain_sync(ctx.as_basic()); } fn on_block_headers(&self, ctx: &EventContext, req_id: ReqId, headers: &[Bytes]) { - let peer_id = ctx.peer(); + if !self.peers.read().contains_key(&ctx.peer()) { + return; + } + + { + let mut state = self.state.lock(); + + *state = match mem::replace(&mut *state, SyncState::Idle) { + SyncState::Idle => SyncState::Idle, + SyncState::AncestorSearch(search) => SyncState::AncestorSearch(search), + SyncState::Rounds(round) => { + SyncState::Rounds(round.process_response(&ResponseCtx { + peer: ctx.peer(), + req_id: req_id, + ctx: ctx.as_basic(), + data: headers, + })) + } + }; + } + + self.maintain_sync(ctx.as_basic()); + } + + fn tick(&self, ctx: &BasicContext) { + self.maintain_sync(ctx); + } +} + +// private helpers +impl LightSync { + fn maintain_sync(&self, ctx: &BasicContext) { + const DRAIN_AMOUNT: usize = 256; + + unimplemented!() } } @@ -120,6 +258,7 @@ impl LightSync { best_seen: Mutex::new(None), peers: RwLock::new(HashMap::new()), client: client, + state: Mutex::new(SyncState::Idle), } } } diff --git a/sync/src/light_sync/sync_round.rs b/sync/src/light_sync/sync_round.rs index 5f121d00c..fb4f5c4ef 100644 --- a/sync/src/light_sync/sync_round.rs +++ b/sync/src/light_sync/sync_round.rs @@ -18,17 +18,14 @@ use std::cmp::Ordering; use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque}; -use std::mem; use ethcore::header::Header; -use light::client::LightChainClient; -use light::net::{EventContext, ReqId}; +use light::net::ReqId; use light::request::Headers as HeadersRequest; use network::PeerId; -use rlp::{UntrustedRlp, View}; -use util::{Bytes, H256, Mutex}; +use util::{Bytes, H256}; use super::response; @@ -47,7 +44,7 @@ pub trait ResponseContext { /// Get the peer who sent this response. fn responder(&self) -> PeerId; /// Get the request ID this response corresponds to. - fn req_id(&self) -> ReqId; + fn req_id(&self) -> &ReqId; /// Get the (unverified) response data. fn data(&self) -> &[Bytes]; /// Punish the responder. @@ -173,7 +170,7 @@ impl Fetcher { } fn process_response(mut self, ctx: &R) -> SyncRound { - let mut request = match self.pending.remove(&ctx.req_id()) { + let mut request = match self.pending.remove(ctx.req_id()) { Some(request) => request, None => return SyncRound::Fetch(self), }; @@ -267,8 +264,8 @@ impl Fetcher { SyncRound::Fetch(self) } - fn drain(mut self, headers: &mut Vec
, max: usize) -> SyncRound { - let max = ::std::cmp::min(max, self.ready.len()); + fn drain(mut self, headers: &mut Vec
, max: Option) -> SyncRound { + let max = ::std::cmp::min(max.unwrap_or(usize::max_value()), self.ready.len()); headers.extend(self.ready.drain(0..max)); if self.sparse.is_empty() && self.ready.is_empty() { @@ -321,7 +318,7 @@ impl RoundStart { fn process_response(mut self, ctx: &R) -> SyncRound { let req = match self.pending_req.take() { - Some((id, ref req)) if ctx.req_id() == id => { req.clone() } + Some((id, ref req)) if ctx.req_id() == &id => { req.clone() } other => { self.pending_req = other; return SyncRound::Start(self); @@ -445,9 +442,9 @@ impl SyncRound { } } - /// Drain up to a maximum number of headers (continuous, starting with a child of + /// Drain up to a maximum number (None -> all) of headers (continuous, starting with a child of /// the round start block) from the round, starting a new one once finished. - pub fn drain(self, v: &mut Vec
, max: usize) -> Self { + pub fn drain(self, v: &mut Vec
, max: Option) -> Self { match self { SyncRound::Fetch(fetcher) => fetcher.drain(v, max), other => other,