diff --git a/ethcore/light/src/client/header_chain.rs b/ethcore/light/src/client/header_chain.rs index a2a953653..b8084cedd 100644 --- a/ethcore/light/src/client/header_chain.rs +++ b/ethcore/light/src/client/header_chain.rs @@ -131,7 +131,7 @@ impl HeaderChain { let total_difficulty = parent_td + view.difficulty(); // insert headers and candidates entries. - candidates.entry(number).or_insert_with(|| Entry { candidates: SmallVec::new(), canonical_hash: hash}) + candidates.entry(number).or_insert_with(|| Entry { candidates: SmallVec::new(), canonical_hash: hash }) .candidates.push(Candidate { hash: hash, parent_hash: parent_hash, @@ -144,17 +144,26 @@ impl HeaderChain { // respective candidates vectors. if self.best_block.read().total_difficulty < total_difficulty { let mut canon_hash = hash; - for (_, entry) in candidates.iter_mut().rev().skip_while(|&(height, _)| *height > number) { - if entry.canonical_hash == canon_hash { break; } + for (&height, entry) in candidates.iter_mut().rev().skip_while(|&(height, _)| *height > number) { + if height != number && entry.canonical_hash == canon_hash { break; } - let canon = entry.candidates.iter().find(|x| x.hash == canon_hash) + trace!(target: "chain", "Setting new canonical block {} for block height {}", + canon_hash, height); + + let canon_pos = entry.candidates.iter().position(|x| x.hash == canon_hash) .expect("blocks are only inserted if parent is present; or this is the block we just added; qed"); + // move the new canonical entry to the front and set the + // era's canonical hash. + entry.candidates.swap(0, canon_pos); + entry.canonical_hash = canon_hash; + // what about reorgs > cht::SIZE + HISTORY? // resetting to the last block of a given CHT should be possible. - canon_hash = canon.parent_hash; + canon_hash = entry.candidates[0].parent_hash; } + trace!(target: "chain", "New best block: ({}, {}), TD {}", number, hash, total_difficulty); *self.best_block.write() = BlockDescriptor { hash: hash, number: number, @@ -360,6 +369,15 @@ mod tests { } } - assert_eq!(chain.best_block().number, 12); + let (mut num, mut canon_hash) = (chain.best_block().number, chain.best_block().hash); + assert_eq!(num, 12); + + while num > 0 { + let header: Header = ::rlp::decode(&chain.get_header(BlockId::Number(num)).unwrap()); + assert_eq!(header.hash(), canon_hash); + + canon_hash = *header.parent_hash(); + num -= 1; + } } } diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index 8c9369790..fe562536a 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -144,7 +144,7 @@ impl TestBlockChainClient { genesis_hash: H256::new(), extra_data: extra_data, last_hash: RwLock::new(H256::new()), - difficulty: RwLock::new(From::from(0)), + difficulty: RwLock::new(spec.genesis_header().difficulty().clone()), balances: RwLock::new(HashMap::new()), nonces: RwLock::new(HashMap::new()), storage: RwLock::new(HashMap::new()), diff --git a/ethcore/src/verification/queue/kind.rs b/ethcore/src/verification/queue/kind.rs index 3ce86ad47..58d88098b 100644 --- a/ethcore/src/verification/queue/kind.rs +++ b/ethcore/src/verification/queue/kind.rs @@ -177,7 +177,6 @@ pub mod headers { } /// A mode for verifying headers. - #[allow(dead_code)] pub struct Headers; impl Kind for Headers { diff --git a/sync/src/light_sync/mod.rs b/sync/src/light_sync/mod.rs index de7d7b05a..9744cf455 100644 --- a/sync/src/light_sync/mod.rs +++ b/sync/src/light_sync/mod.rs @@ -55,13 +55,25 @@ mod sync_round; mod tests; /// Peer chain info. -#[derive(Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] struct ChainInfo { head_td: U256, head_hash: H256, head_num: u64, } +impl PartialOrd for ChainInfo { + fn partial_cmp(&self, other: &Self) -> Option<::std::cmp::Ordering> { + self.head_td.partial_cmp(&other.head_td) + } +} + +impl Ord for ChainInfo { + fn cmp(&self, other: &Self) -> ::std::cmp::Ordering { + self.head_td.cmp(&other.head_td) + } +} + struct Peer { status: ChainInfo, } @@ -74,6 +86,7 @@ impl Peer { } } } + // search for a common ancestor with the best chain. #[derive(Debug)] enum AncestorSearch { @@ -107,13 +120,18 @@ impl AncestorSearch { return AncestorSearch::FoundCommon(header.number(), header.hash()); } - if header.number() <= first_num { + if header.number() < first_num { debug!(target: "sync", "Prehistoric common ancestor with best chain."); return AncestorSearch::Prehistoric; } } - AncestorSearch::Queued(start - headers.len() as u64) + let probe = start - headers.len() as u64; + if probe == 0 { + AncestorSearch::Genesis + } else { + AncestorSearch::Queued(probe) + } } Err(e) => { trace!(target: "sync", "Bad headers response from {}: {}", ctx.responder(), e); @@ -137,12 +155,13 @@ impl AncestorSearch { match self { AncestorSearch::Queued(start) => { + let batch_size = ::std::cmp::min(start as usize, BATCH_SIZE); trace!(target: "sync", "Requesting {} reverse headers from {} to find common ancestor", - BATCH_SIZE, start); + batch_size, start); let req = request::Headers { start: start.into(), - max: ::std::cmp::min(start as usize, BATCH_SIZE), + max: batch_size, skip: 0, reverse: true, }; @@ -185,7 +204,7 @@ impl<'a> ResponseContext for ResponseCtx<'a> { /// Light client synchronization manager. See module docs for more details. pub struct LightSync { - best_seen: Mutex>, // best seen block on the network. + best_seen: Mutex>, // best seen block on the network. peers: RwLock>>, // peers which are relevant to synchronization. client: Arc, rng: Mutex, @@ -194,9 +213,7 @@ pub struct LightSync { impl Handler for LightSync { fn on_connect(&self, ctx: &EventContext, status: &Status, capabilities: &Capabilities) { - let our_best = self.client.chain_info().best_block_number; - - if !capabilities.serve_headers || status.head_num <= our_best { + if !capabilities.serve_headers { trace!(target: "sync", "Disconnecting irrelevant peer: {}", ctx.peer()); ctx.disconnect_peer(ctx.peer()); return; @@ -210,9 +227,7 @@ impl Handler for LightSync { { let mut best = self.best_seen.lock(); - if best.as_ref().map_or(true, |b| status.head_td > b.1) { - *best = Some((status.head_hash, status.head_td)); - } + *best = ::std::cmp::max(best.clone(), Some(chain_info.clone())); } self.peers.write().insert(ctx.peer(), Mutex::new(Peer::new(chain_info))); @@ -231,17 +246,13 @@ impl Handler for LightSync { let new_best = { let mut best = self.best_seen.lock(); - let peer_best = (peer.status.head_hash, peer.status.head_td); - if best.as_ref().map_or(false, |b| b == &peer_best) { + if best.as_ref().map_or(false, |b| b == &peer.status) { // search for next-best block. - let next_best: Option<(H256, U256)> = self.peers.read().values() - .map(|p| p.lock()) - .map(|p| (p.status.head_hash, p.status.head_td)) - .fold(None, |acc, x| match acc { - Some(acc) => if x.1 > acc.1 { Some(x) } else { Some(acc) }, - None => Some(x), - }); + let next_best: Option = self.peers.read().values() + .map(|p| p.lock().status.clone()) + .map(Some) + .fold(None, ::std::cmp::max); *best = next_best; } @@ -266,7 +277,7 @@ impl Handler for LightSync { } fn on_announcement(&self, ctx: &EventContext, announcement: &Announcement) { - let last_td = { + let (last_td, chain_info) = { let peers = self.peers.read(); match peers.get(&ctx.peer()) { None => return, @@ -278,7 +289,7 @@ impl Handler for LightSync { head_hash: announcement.head_hash, head_num: announcement.head_num, }; - last_td + (last_td, peer.status.clone()) } } }; @@ -290,13 +301,12 @@ impl Handler for LightSync { trace!(target: "sync", "Peer {} moved backwards.", ctx.peer()); self.peers.write().remove(&ctx.peer()); ctx.disconnect_peer(ctx.peer()); + return } { let mut best = self.best_seen.lock(); - if best.as_ref().map_or(true, |b| announcement.head_td > b.1) { - *best = Some((announcement.head_hash, announcement.head_td)); - } + *best = ::std::cmp::max(best.clone(), Some(chain_info)); } self.maintain_sync(ctx.as_basic()); @@ -352,10 +362,12 @@ impl LightSync { *state = SyncState::AncestorSearch(AncestorSearch::begin(chain_info.best_block_number)); } + // handles request dispatch, block import, and state machine transitions. fn maintain_sync(&self, ctx: &BasicContext) { const DRAIN_AMOUNT: usize = 128; let mut state = self.state.lock(); + let chain_info = self.client.chain_info(); debug!(target: "sync", "Maintaining sync ({:?})", &*state); // drain any pending blocks into the queue. @@ -364,8 +376,7 @@ impl LightSync { 'a: loop { - let queue_info = self.client.queue_info(); - if queue_info.is_full() { break } + if self.client.queue_info().is_full() { break } *state = match mem::replace(&mut *state, SyncState::Idle) { SyncState::Rounds(round) @@ -389,12 +400,23 @@ impl LightSync { // handle state transitions. { - let chain_info = self.client.chain_info(); - let best_td = chain_info.total_difficulty; + let best_td = chain_info.pending_total_difficulty; + let sync_target = match *self.best_seen.lock() { + Some(ref target) if target.head_td > best_td => (target.head_num, target.head_hash), + _ => { + trace!(target: "sync", "No target to sync to."); + *state = SyncState::Idle; + return; + } + }; + match mem::replace(&mut *state, SyncState::Idle) { - _ if self.best_seen.lock().as_ref().map_or(true, |&(_, td)| best_td >= td) - => *state = SyncState::Idle, - SyncState::Rounds(SyncRound::Abort(reason, _)) => { + SyncState::Rounds(SyncRound::Abort(reason, remaining)) => { + if remaining.len() > 0 { + *state = SyncState::Rounds(SyncRound::Abort(reason, remaining)); + return; + } + match reason { AbortReason::BadScaffold(bad_peers) => { debug!(target: "sync", "Disabling peers responsible for bad scaffold"); @@ -403,20 +425,23 @@ impl LightSync { } } AbortReason::NoResponses => {} + AbortReason::TargetReached => { + debug!(target: "sync", "Sync target reached. Going idle"); + *state = SyncState::Idle; + return; + } } debug!(target: "sync", "Beginning search after aborted sync round"); self.begin_search(&mut state); } SyncState::AncestorSearch(AncestorSearch::FoundCommon(num, hash)) => { - // TODO: compare to best block and switch to another downloading - // method when close. - *state = SyncState::Rounds(SyncRound::begin(num, hash)); + *state = SyncState::Rounds(SyncRound::begin((num, hash), sync_target)); } SyncState::AncestorSearch(AncestorSearch::Genesis) => { // Same here. let g_hash = chain_info.genesis_hash; - *state = SyncState::Rounds(SyncRound::begin(0, g_hash)); + *state = SyncState::Rounds(SyncRound::begin((0, g_hash), sync_target)); } SyncState::Idle => self.begin_search(&mut state), other => *state = other, // restore displaced state. @@ -424,11 +449,15 @@ impl LightSync { } // allow dispatching of requests. - // TODO: maybe wait until the amount of cumulative requests remaining is high enough - // to avoid pumping the failure rate. { let peers = self.peers.read(); - let mut peer_ids: Vec<_> = peers.keys().cloned().collect(); + let mut peer_ids: Vec<_> = peers.iter().filter_map(|(id, p)| { + if p.lock().status.head_td > chain_info.pending_total_difficulty { + Some(*id) + } else { + None + } + }).collect(); let mut rng = self.rng.lock(); // naive request dispatcher: just give to any peer which says it will diff --git a/sync/src/light_sync/sync_round.rs b/sync/src/light_sync/sync_round.rs index 29d93daa8..f91dcb18a 100644 --- a/sync/src/light_sync/sync_round.rs +++ b/sync/src/light_sync/sync_round.rs @@ -30,13 +30,6 @@ use util::{Bytes, H256}; use super::response; -/// amount of blocks between each scaffold entry. -// TODO: move these into parameters for `RoundStart::new`? -pub const ROUND_SKIP: u64 = 255; - -// amount of scaffold frames: these are the blank spaces in "X___X___X" -const ROUND_FRAMES: usize = 255; - // number of attempts to make to get a full scaffold for a sync round. const SCAFFOLD_ATTEMPTS: usize = 3; @@ -59,6 +52,8 @@ pub enum AbortReason { BadScaffold(Vec), /// No incoming data. NoResponses, + /// Sync rounds completed. + TargetReached, } // A request for headers with a known starting header hash. @@ -96,6 +91,7 @@ pub struct Fetcher { scaffold_contributors: Vec, ready: VecDeque
, end: (u64, H256), + target: (u64, H256), } impl Fetcher { @@ -103,7 +99,7 @@ impl Fetcher { // with a list of peers who helped produce the chain. // The headers must be valid RLP at this point and must have a consistent // non-zero gap between them. Will abort the round if found wrong. - fn new(sparse_headers: Vec
, contributors: Vec) -> SyncRound { + fn new(sparse_headers: Vec
, contributors: Vec, target: (u64, H256)) -> SyncRound { let mut requests = BinaryHeap::with_capacity(sparse_headers.len() - 1); for pair in sparse_headers.windows(2) { @@ -144,6 +140,7 @@ impl Fetcher { scaffold_contributors: contributors, ready: VecDeque::new(), end: end, + target: target, }) } @@ -170,6 +167,8 @@ impl Fetcher { if self.sparse.len() == 1 { self.ready.push_back(self.sparse.pop_back().expect("sparse known to have one entry; qed")) } + + trace!(target: "sync", "{} headers ready to drain", self.ready.len()); } fn process_response(mut self, ctx: &R) -> SyncRound { @@ -178,11 +177,16 @@ impl Fetcher { None => return SyncRound::Fetch(self), }; + trace!(target: "sync", "Received response for subchain ({} -> {})", + request.subchain_parent.0 + 1, request.subchain_end.0); + let headers = ctx.data(); if headers.len() == 0 { trace!(target: "sync", "Punishing peer {} for empty response", ctx.responder()); ctx.punish_responder(); + + self.requests.push(request); return SyncRound::Fetch(self); } @@ -274,32 +278,66 @@ impl Fetcher { if self.sparse.is_empty() && self.ready.is_empty() { trace!(target: "sync", "sync round complete. Starting anew from {:?}", self.end); - SyncRound::Start(RoundStart::new(self.end)) + SyncRound::begin(self.end, self.target) } else { SyncRound::Fetch(self) } } } +// Compute scaffold parameters from non-zero distance between start and target block: (skip, pivots). +fn scaffold_params(diff: u64) -> (u64, usize) { + // default parameters. + // amount of blocks between each scaffold pivot. + const ROUND_SKIP: u64 = 255; + // amount of scaffold pivots: these are the Xs in "X___X___X" + const ROUND_PIVOTS: usize = 256; + + let rem = diff % (ROUND_SKIP + 1); + if diff <= ROUND_SKIP { + // just request headers from the start to the target. + (0, rem as usize) + } else { + // the number of pivots necessary to exactly hit or overshoot the target. + let pivots_to_target = (diff / (ROUND_SKIP + 1)) + if rem == 0 { 0 } else { 1 }; + let num_pivots = ::std::cmp::min(pivots_to_target, ROUND_PIVOTS as u64) as usize; + (ROUND_SKIP, num_pivots) + } +} + /// Round started: get stepped header chain. -/// from a start block with number X we request 256 headers stepped by 256 from -/// block X + 1. +/// from a start block with number X we request ROUND_PIVOTS headers stepped by ROUND_SKIP from +/// block X + 1 to a target >= X + 1. +/// If the sync target is within ROUND_SKIP of the start, we request +/// only those blocks. If the sync target is within (ROUND_SKIP + 1) * (ROUND_PIVOTS - 1) of +/// the start, we reduce the number of pivots so the target is outside it. pub struct RoundStart { start_block: (u64, H256), + target: (u64, H256), pending_req: Option<(ReqId, HeadersRequest)>, sparse_headers: Vec
, contributors: HashSet, attempt: usize, + skip: u64, + pivots: usize, } impl RoundStart { - fn new(start: (u64, H256)) -> Self { + fn new(start: (u64, H256), target: (u64, H256)) -> Self { + let (skip, pivots) = scaffold_params(target.0 - start.0); + + trace!(target: "sync", "Beginning sync round: {} pivots and {} skip from block {}", + pivots, skip, start.0); + RoundStart { - start_block: start.clone(), + start_block: start, + target: target, pending_req: None, sparse_headers: Vec::new(), contributors: HashSet::new(), attempt: 0, + skip: skip, + pivots: pivots, } } @@ -309,10 +347,16 @@ impl RoundStart { self.attempt += 1; if self.attempt >= SCAFFOLD_ATTEMPTS { - if self.sparse_headers.len() > 1 { - Fetcher::new(self.sparse_headers, self.contributors.into_iter().collect()) + return if self.sparse_headers.len() > 1 { + Fetcher::new(self.sparse_headers, self.contributors.into_iter().collect(), self.target) } else { - SyncRound::Abort(AbortReason::NoResponses, self.sparse_headers.into()) + let fetched_headers = if self.skip == 0 { + self.sparse_headers.into() + } else { + VecDeque::new() + }; + + SyncRound::abort(AbortReason::NoResponses, fetched_headers) } } else { SyncRound::Start(self) @@ -339,11 +383,18 @@ impl RoundStart { self.contributors.insert(ctx.responder()); self.sparse_headers.extend(headers); - if self.sparse_headers.len() == ROUND_FRAMES + 1 { - trace!(target: "sync", "Beginning fetch of blocks between {} sparse headers", - self.sparse_headers.len()); - - return Fetcher::new(self.sparse_headers, self.contributors.into_iter().collect()); + if self.sparse_headers.len() == self.pivots { + return if self.skip == 0 { + SyncRound::abort(AbortReason::TargetReached, self.sparse_headers.into()) + } else { + trace!(target: "sync", "Beginning fetch of blocks between {} sparse headers", + self.sparse_headers.len()); + Fetcher::new( + self.sparse_headers, + self.contributors.into_iter().collect(), + self.target + ) + } } } Err(e) => { @@ -376,20 +427,20 @@ impl RoundStart { if self.pending_req.is_none() { // beginning offset + first block expected after last header we have. let start = (self.start_block.0 + 1) - + self.sparse_headers.len() as u64 * (ROUND_SKIP + 1); + + self.sparse_headers.len() as u64 * (self.skip + 1); - let max = (ROUND_FRAMES - 1) - self.sparse_headers.len(); + let max = self.pivots - self.sparse_headers.len(); let headers_request = HeadersRequest { start: start.into(), max: max, - skip: ROUND_SKIP, + skip: self.skip, reverse: false, }; if let Some(req_id) = dispatcher(headers_request.clone()) { trace!(target: "sync", "Requesting scaffold: {} headers forward from {}, skip={}", - max, start, ROUND_SKIP); + max, start, self.skip); self.pending_req = Some((req_id, headers_request)); } @@ -411,14 +462,18 @@ pub enum SyncRound { impl SyncRound { fn abort(reason: AbortReason, remaining: VecDeque
) -> Self { - trace!(target: "sync", "Aborting sync round: {:?}. To drain: {:?}", reason, remaining); + trace!(target: "sync", "Aborting sync round: {:?}. To drain: {}", reason, remaining.len()); SyncRound::Abort(reason, remaining) } - /// Begin sync rounds from a starting block. - pub fn begin(num: u64, hash: H256) -> Self { - SyncRound::Start(RoundStart::new((num, hash))) + /// Begin sync rounds from a starting block, but not to go past a given target + pub fn begin(start: (u64, H256), target: (u64, H256)) -> Self { + if target.0 <= start.0 { + SyncRound::abort(AbortReason::TargetReached, VecDeque::new()) + } else { + SyncRound::Start(RoundStart::new(start, target)) + } } /// Process an answer to a request. Unknown requests will be ignored. @@ -478,3 +533,21 @@ impl fmt::Debug for SyncRound { } } } + +#[cfg(test)] +mod tests { + use super::scaffold_params; + + #[test] + fn scaffold_config() { + // within a certain distance of the head, we download + // sequentially. + assert_eq!(scaffold_params(1), (0, 1)); + assert_eq!(scaffold_params(6), (0, 6)); + + // when scaffolds are useful, download enough frames to get + // within a close distance of the goal. + assert_eq!(scaffold_params(1000), (255, 4)); + assert_eq!(scaffold_params(1024), (255, 4)); + } +} diff --git a/sync/src/light_sync/tests/mod.rs b/sync/src/light_sync/tests/mod.rs index 9eefbec41..283e0adb8 100644 --- a/sync/src/light_sync/tests/mod.rs +++ b/sync/src/light_sync/tests/mod.rs @@ -14,6 +14,50 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -#![allow(dead_code)] +use tests::helpers::TestNet; + +use ethcore::client::{BlockChainClient, BlockId, EachBlockWith}; mod test_net; + +#[test] +fn basic_sync() { + let mut net = TestNet::light(1, 2); + net.peer(1).chain().add_blocks(5000, EachBlockWith::Nothing); + net.peer(2).chain().add_blocks(6000, EachBlockWith::Nothing); + + net.sync(); + + assert!(net.peer(0).light_chain().get_header(BlockId::Number(6000)).is_some()); +} + +#[test] +fn fork_post_cht() { + const CHAIN_LENGTH: u64 = 50; // shouldn't be longer than ::light::cht::size(); + + let mut net = TestNet::light(1, 2); + + // peer 2 is on a higher TD chain. + net.peer(1).chain().add_blocks(CHAIN_LENGTH as usize, EachBlockWith::Nothing); + net.peer(2).chain().add_blocks(CHAIN_LENGTH as usize + 1, EachBlockWith::Uncle); + + // get the light peer on peer 1's chain. + for id in (0..CHAIN_LENGTH).map(|x| x + 1).map(BlockId::Number) { + let (light_peer, full_peer) = (net.peer(0), net.peer(1)); + let light_chain = light_peer.light_chain(); + let header = full_peer.chain().block_header(id).unwrap().decode(); + let _ = light_chain.import_header(header); + light_chain.flush_queue(); + light_chain.import_verified(); + assert!(light_chain.get_header(id).is_some()); + } + + net.sync(); + + for id in (0..CHAIN_LENGTH).map(|x| x + 1).map(BlockId::Number) { + assert_eq!( + net.peer(0).light_chain().get_header(id), + net.peer(2).chain().block_header(id).map(|h| h.into_inner()) + ); + } +} diff --git a/sync/src/light_sync/tests/test_net.rs b/sync/src/light_sync/tests/test_net.rs index fa5724666..264160510 100644 --- a/sync/src/light_sync/tests/test_net.rs +++ b/sync/src/light_sync/tests/test_net.rs @@ -174,13 +174,24 @@ impl PeerLike for Peer { } fn is_done(&self) -> bool { - self.queue.read().is_empty() + self.queue.read().is_empty() && match self.data { + PeerData::Light(_, ref client) => { + // should create a test light client which just imports + // headers directly and doesn't have a queue to drain. + client.import_verified(); + client.queue_info().is_empty() + } + _ => true, + } } fn sync_step(&self) { if let PeerData::Light(_, ref client) = self.data { client.flush_queue(); - client.import_verified(); + + while !client.queue_info().is_empty() { + client.import_verified() + } } }