2016-12-13 21:09:43 +01:00
|
|
|
// Copyright 2015, 2016 Parity Technologies (UK) Ltd.
|
|
|
|
// This file is part of Parity.
|
|
|
|
|
|
|
|
// Parity is free software: you can redistribute it and/or modify
|
|
|
|
// it under the terms of the GNU General Public License as published by
|
|
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
|
|
// (at your option) any later version.
|
|
|
|
|
|
|
|
// Parity is distributed in the hope that it will be useful,
|
|
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
// GNU General Public License for more details.
|
|
|
|
|
|
|
|
// You should have received a copy of the GNU General Public License
|
|
|
|
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
|
|
|
|
//! Light client synchronization.
|
|
|
|
//!
|
|
|
|
//! This will synchronize the header chain using LES messages.
|
|
|
|
//! Dataflow is largely one-directional as headers are pushed into
|
|
|
|
//! the light client queue for import. Where possible, they are batched
|
|
|
|
//! in groups.
|
|
|
|
//!
|
|
|
|
//! This is written assuming that the client and sync service are running
|
2016-12-14 23:26:15 +01:00
|
|
|
//! in the same binary; unlike a full node which might communicate via IPC.
|
2016-12-13 21:09:43 +01:00
|
|
|
|
2016-12-13 22:26:06 +01:00
|
|
|
use std::collections::HashMap;
|
2016-12-15 21:51:08 +01:00
|
|
|
use std::mem;
|
2016-12-13 21:09:43 +01:00
|
|
|
use std::sync::Arc;
|
|
|
|
|
2016-12-13 21:09:57 +01:00
|
|
|
use ethcore::header::Header;
|
|
|
|
|
2016-12-14 23:26:15 +01:00
|
|
|
use light::client::LightChainClient;
|
2016-12-15 21:51:08 +01:00
|
|
|
use light::net::{
|
|
|
|
Announcement, Handler, BasicContext, EventContext,
|
|
|
|
Capabilities, ReqId, Status
|
|
|
|
};
|
2016-12-13 21:09:43 +01:00
|
|
|
use light::request;
|
|
|
|
use network::PeerId;
|
2016-12-13 21:09:57 +01:00
|
|
|
use util::{Bytes, U256, H256, Mutex, RwLock};
|
2016-12-15 22:42:24 +01:00
|
|
|
use rand::{Rng, OsRng};
|
2016-12-13 21:09:43 +01:00
|
|
|
|
2016-12-15 22:42:24 +01:00
|
|
|
use self::sync_round::{AbortReason, SyncRound, ResponseContext};
|
2016-12-15 21:51:08 +01:00
|
|
|
|
2016-12-14 23:26:15 +01:00
|
|
|
mod response;
|
|
|
|
mod sync_round;
|
2016-12-13 21:09:43 +01:00
|
|
|
|
|
|
|
/// Peer chain info.
|
|
|
|
#[derive(Clone)]
|
|
|
|
struct ChainInfo {
|
|
|
|
head_td: U256,
|
|
|
|
head_hash: H256,
|
|
|
|
head_num: u64,
|
|
|
|
}
|
|
|
|
|
2016-12-14 23:26:15 +01:00
|
|
|
struct Peer {
|
|
|
|
status: ChainInfo,
|
2016-12-13 21:09:43 +01:00
|
|
|
}
|
|
|
|
|
2016-12-14 23:26:15 +01:00
|
|
|
impl Peer {
|
|
|
|
/// Create a peer object.
|
|
|
|
fn new(chain_info: ChainInfo) -> Self {
|
|
|
|
Peer {
|
|
|
|
status: chain_info.clone(),
|
2016-12-13 21:09:43 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-12-15 22:42:24 +01:00
|
|
|
// Search for a common ancestor with the best chain.
|
2016-12-15 21:51:08 +01:00
|
|
|
struct AncestorSearch {
|
|
|
|
last_batched: u64,
|
2016-12-15 22:42:24 +01:00
|
|
|
req_id: Option<ReqId>,
|
2016-12-15 21:51:08 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// synchronization state machine.
|
|
|
|
enum SyncState {
|
|
|
|
// Idle (waiting for peers)
|
|
|
|
Idle,
|
|
|
|
// searching for common ancestor with best chain.
|
|
|
|
// queue should be cleared at this phase.
|
|
|
|
AncestorSearch(AncestorSearch),
|
|
|
|
// Doing sync rounds.
|
|
|
|
Rounds(SyncRound),
|
|
|
|
}
|
|
|
|
|
|
|
|
struct ResponseCtx<'a> {
|
|
|
|
peer: PeerId,
|
|
|
|
req_id: ReqId,
|
|
|
|
ctx: &'a BasicContext,
|
|
|
|
data: &'a [Bytes],
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<'a> ResponseContext for ResponseCtx<'a> {
|
|
|
|
fn responder(&self) -> PeerId { self.peer }
|
|
|
|
fn req_id(&self) -> &ReqId { &self.req_id }
|
|
|
|
fn data(&self) -> &[Bytes] { self.data }
|
|
|
|
fn punish_responder(&self) { self.ctx.disable_peer(self.peer) }
|
|
|
|
}
|
|
|
|
|
2016-12-13 21:09:43 +01:00
|
|
|
/// Light client synchronization manager. See module docs for more details.
|
2016-12-14 23:26:15 +01:00
|
|
|
pub struct LightSync<L: LightChainClient> {
|
2016-12-13 21:09:43 +01:00
|
|
|
best_seen: Mutex<Option<(H256, U256)>>, // best seen block on the network.
|
2016-12-13 22:26:06 +01:00
|
|
|
peers: RwLock<HashMap<PeerId, Mutex<Peer>>>, // peers which are relevant to synchronization.
|
2016-12-14 23:26:15 +01:00
|
|
|
client: Arc<L>,
|
2016-12-15 22:42:24 +01:00
|
|
|
rng: OsRng,
|
2016-12-15 21:51:08 +01:00
|
|
|
state: Mutex<SyncState>,
|
2016-12-13 21:09:43 +01:00
|
|
|
}
|
|
|
|
|
2016-12-14 23:26:15 +01:00
|
|
|
impl<L: LightChainClient> Handler for LightSync<L> {
|
2016-12-13 21:09:43 +01:00
|
|
|
fn on_connect(&self, ctx: &EventContext, status: &Status, capabilities: &Capabilities) {
|
2016-12-13 21:09:57 +01:00
|
|
|
let our_best = self.client.chain_info().best_block_number;
|
|
|
|
|
|
|
|
if !capabilities.serve_headers || status.head_num <= our_best {
|
2016-12-15 21:51:08 +01:00
|
|
|
trace!(target: "sync", "Disconnecting irrelevant peer: {}", ctx.peer());
|
|
|
|
ctx.disconnect_peer(ctx.peer());
|
2016-12-13 21:09:43 +01:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
let chain_info = ChainInfo {
|
|
|
|
head_td: status.head_td,
|
|
|
|
head_hash: status.head_hash,
|
|
|
|
head_num: status.head_num,
|
|
|
|
};
|
2016-12-13 22:26:06 +01:00
|
|
|
|
2016-12-15 21:51:08 +01:00
|
|
|
let mut best = self.best_seen.lock();
|
|
|
|
if best.as_ref().map_or(true, |b| status.head_td > b.1) {
|
|
|
|
*best = Some((status.head_hash, status.head_td));
|
|
|
|
}
|
|
|
|
|
2016-12-14 23:26:15 +01:00
|
|
|
self.peers.write().insert(ctx.peer(), Mutex::new(Peer::new(chain_info)));
|
2016-12-15 21:51:08 +01:00
|
|
|
self.maintain_sync(ctx.as_basic());
|
2016-12-13 22:26:06 +01:00
|
|
|
}
|
|
|
|
|
2016-12-15 21:51:08 +01:00
|
|
|
fn on_disconnect(&self, ctx: &EventContext, unfulfilled: &[ReqId]) {
|
2016-12-14 23:26:15 +01:00
|
|
|
let peer_id = ctx.peer();
|
2016-12-13 22:26:06 +01:00
|
|
|
|
2016-12-15 21:51:08 +01:00
|
|
|
let peer = match self.peers.write().remove(&peer_id).map(|p| p.into_inner()) {
|
|
|
|
Some(peer) => peer,
|
|
|
|
None => return,
|
|
|
|
};
|
|
|
|
|
|
|
|
let new_best = {
|
|
|
|
let mut best = self.best_seen.lock();
|
|
|
|
let peer_best = (peer.status.head_hash, peer.status.head_td);
|
|
|
|
|
|
|
|
if best.as_ref().map_or(false, |b| b == &peer_best) {
|
|
|
|
// search for next-best block.
|
|
|
|
let next_best: Option<(H256, U256)> = self.peers.read().values()
|
|
|
|
.map(|p| p.lock())
|
|
|
|
.map(|p| (p.status.head_hash, p.status.head_td))
|
|
|
|
.fold(None, |acc, x| match acc {
|
|
|
|
Some(acc) => if x.1 > acc.1 { Some(x) } else { Some(acc) },
|
|
|
|
None => Some(x),
|
|
|
|
});
|
|
|
|
|
|
|
|
*best = next_best;
|
|
|
|
}
|
|
|
|
|
|
|
|
best.clone()
|
|
|
|
};
|
|
|
|
|
|
|
|
if new_best.is_none() {
|
|
|
|
debug!(target: "sync", "No peers remain. Reverting to idle");
|
|
|
|
*self.state.lock() = SyncState::Idle;
|
|
|
|
} else {
|
|
|
|
let mut state = self.state.lock();
|
|
|
|
|
|
|
|
*state = match mem::replace(&mut *state, SyncState::Idle) {
|
|
|
|
SyncState::Idle => SyncState::Idle,
|
|
|
|
SyncState::AncestorSearch(search) => SyncState::AncestorSearch(search),
|
|
|
|
SyncState::Rounds(round) => SyncState::Rounds(round.requests_abandoned(unfulfilled)),
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
self.maintain_sync(ctx.as_basic());
|
2016-12-13 22:26:06 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
fn on_announcement(&self, ctx: &EventContext, announcement: &Announcement) {
|
2016-12-15 21:51:08 +01:00
|
|
|
let last_td = {
|
|
|
|
let peers = self.peers.read();
|
|
|
|
match peers.get(&ctx.peer()){
|
|
|
|
None => return,
|
|
|
|
Some(peer) => {
|
|
|
|
let mut peer = peer.lock();
|
|
|
|
let last_td = peer.status.head_td;
|
|
|
|
peer.status = ChainInfo {
|
|
|
|
head_td: announcement.head_td,
|
|
|
|
head_hash: announcement.head_hash,
|
|
|
|
head_num: announcement.head_num,
|
|
|
|
};
|
|
|
|
last_td
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
2016-12-13 22:26:06 +01:00
|
|
|
|
|
|
|
trace!(target: "sync", "Announcement from peer {}: new chain head {:?}, reorg depth {}",
|
|
|
|
ctx.peer(), (announcement.head_hash, announcement.head_num), announcement.reorg_depth);
|
2016-12-15 21:51:08 +01:00
|
|
|
|
|
|
|
if last_td < announcement.head_td {
|
|
|
|
trace!(target: "sync", "Peer {} moved backwards.", ctx.peer());
|
|
|
|
self.peers.write().remove(&ctx.peer());
|
|
|
|
ctx.disconnect_peer(ctx.peer());
|
|
|
|
}
|
|
|
|
|
|
|
|
let mut best = self.best_seen.lock();
|
|
|
|
if best.as_ref().map_or(true, |b| announcement.head_td > b.1) {
|
|
|
|
*best = Some((announcement.head_hash, announcement.head_td));
|
|
|
|
}
|
|
|
|
|
|
|
|
self.maintain_sync(ctx.as_basic());
|
2016-12-13 22:26:06 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
fn on_block_headers(&self, ctx: &EventContext, req_id: ReqId, headers: &[Bytes]) {
|
2016-12-15 21:51:08 +01:00
|
|
|
if !self.peers.read().contains_key(&ctx.peer()) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
{
|
|
|
|
let mut state = self.state.lock();
|
|
|
|
|
|
|
|
*state = match mem::replace(&mut *state, SyncState::Idle) {
|
|
|
|
SyncState::Idle => SyncState::Idle,
|
|
|
|
SyncState::AncestorSearch(search) => SyncState::AncestorSearch(search),
|
|
|
|
SyncState::Rounds(round) => {
|
|
|
|
SyncState::Rounds(round.process_response(&ResponseCtx {
|
|
|
|
peer: ctx.peer(),
|
|
|
|
req_id: req_id,
|
|
|
|
ctx: ctx.as_basic(),
|
|
|
|
data: headers,
|
|
|
|
}))
|
|
|
|
}
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
self.maintain_sync(ctx.as_basic());
|
|
|
|
}
|
|
|
|
|
|
|
|
fn tick(&self, ctx: &BasicContext) {
|
|
|
|
self.maintain_sync(ctx);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// private helpers
|
|
|
|
impl<L: LightChainClient> LightSync<L> {
|
2016-12-15 22:42:24 +01:00
|
|
|
// Begins a search for the common ancestor and our best block.
|
|
|
|
// does not lock state, instead has a mutable reference to it passed.
|
|
|
|
fn begin_search(&self, _state: &mut SyncState) {
|
|
|
|
self.client.clear_queue();
|
|
|
|
|
|
|
|
unimplemented!();
|
|
|
|
}
|
|
|
|
|
2016-12-15 21:51:08 +01:00
|
|
|
fn maintain_sync(&self, ctx: &BasicContext) {
|
2016-12-15 22:42:24 +01:00
|
|
|
const DRAIN_AMOUNT: usize = 128;
|
|
|
|
|
|
|
|
let mut state = self.state.lock();
|
|
|
|
|
|
|
|
// drain any pending blocks into the queue.
|
|
|
|
{
|
|
|
|
let mut sink = Vec::with_capacity(DRAIN_AMOUNT);
|
2016-12-15 21:51:08 +01:00
|
|
|
|
2016-12-15 22:42:24 +01:00
|
|
|
'a:
|
|
|
|
loop {
|
|
|
|
let queue_info = self.client.queue_info();
|
|
|
|
if queue_info.is_full() { break }
|
|
|
|
|
|
|
|
*state = match mem::replace(&mut *state, SyncState::Idle) {
|
|
|
|
SyncState::Rounds(round)
|
|
|
|
=> SyncState::Rounds(round.drain(&mut sink, Some(DRAIN_AMOUNT))),
|
|
|
|
other => other,
|
|
|
|
};
|
|
|
|
|
|
|
|
if sink.is_empty() { break }
|
|
|
|
|
|
|
|
for header in sink.drain(..) {
|
|
|
|
if let Err(e) = self.client.queue_header(header) {
|
|
|
|
debug!(target: "sync", "Found bad header ({:?}). Reset to search state.", e);
|
|
|
|
|
|
|
|
self.begin_search(&mut state);
|
|
|
|
break 'a;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// check for aborted sync round.
|
|
|
|
{
|
|
|
|
match mem::replace(&mut *state, SyncState::Idle) {
|
|
|
|
SyncState::Rounds(SyncRound::Abort(reason)) => {
|
|
|
|
match reason {
|
|
|
|
AbortReason::BadScaffold(bad_peers) => {
|
|
|
|
debug!(target: "sync", "Disabling peers responsible for bad scaffold");
|
|
|
|
for peer in bad_peers {
|
|
|
|
ctx.disable_peer(peer);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
AbortReason::NoResponses => {}
|
|
|
|
}
|
|
|
|
|
|
|
|
debug!(target: "sync", "Beginning search after aborted sync round");
|
|
|
|
self.begin_search(&mut state);
|
|
|
|
}
|
|
|
|
other => *state = other, // restore displaced state.
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// allow dispatching of requests.
|
|
|
|
{
|
|
|
|
*state = match mem::replace(&mut *state, SyncState::Idle) {
|
|
|
|
SyncState::Rounds(round)
|
|
|
|
=> SyncState::Rounds(round.dispatch_requests(|_| unimplemented!())),
|
|
|
|
other => other,
|
|
|
|
};
|
|
|
|
}
|
2016-12-13 21:09:43 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// public API
|
2016-12-14 23:26:15 +01:00
|
|
|
impl<L: LightChainClient> LightSync<L> {
|
2016-12-13 21:09:43 +01:00
|
|
|
/// Create a new instance of `LightSync`.
|
|
|
|
///
|
|
|
|
/// This won't do anything until registered as a handler
|
2016-12-13 22:26:06 +01:00
|
|
|
/// so it can act on events.
|
2016-12-15 22:42:24 +01:00
|
|
|
pub fn new(client: Arc<L>) -> Result<Self, ::std::io::Error> {
|
|
|
|
Ok(LightSync {
|
2016-12-13 21:09:43 +01:00
|
|
|
best_seen: Mutex::new(None),
|
2016-12-13 21:09:57 +01:00
|
|
|
peers: RwLock::new(HashMap::new()),
|
2016-12-13 21:09:43 +01:00
|
|
|
client: client,
|
2016-12-15 22:42:24 +01:00
|
|
|
rng: try!(OsRng::new()),
|
2016-12-15 21:51:08 +01:00
|
|
|
state: Mutex::new(SyncState::Idle),
|
2016-12-15 22:42:24 +01:00
|
|
|
})
|
2016-12-13 21:09:43 +01:00
|
|
|
}
|
|
|
|
}
|