handle events, minimal state machine

This commit is contained in:
Robert Habermeier 2016-12-15 21:51:08 +01:00
parent 72f7391551
commit 9c7340307e
4 changed files with 170 additions and 26 deletions

View File

@ -105,6 +105,9 @@ pub trait EventContext: BasicContext {
/// Get the peer relevant to the event e.g. message sender,
/// disconnected/connected peer.
fn peer(&self) -> PeerId;
/// Treat the event context as a basic context.
fn as_basic(&self) -> &BasicContext;
}
/// Basic context.
@ -182,4 +185,8 @@ impl<'a> EventContext for Ctx<'a> {
fn peer(&self) -> PeerId {
self.peer
}
fn as_basic(&self) -> &BasicContext {
&*self
}
}

View File

@ -54,11 +54,12 @@ extern crate ethcore_ipc as ipc;
mod chain;
mod blocks;
mod block_sync;
mod light_sync;
mod sync_io;
mod snapshot;
mod transactions_stats;
pub mod light_sync;
#[cfg(test)]
mod tests;

View File

@ -25,18 +25,22 @@
//! in the same binary; unlike a full node which might communicate via IPC.
use std::collections::HashMap;
use std::fmt;
use std::mem;
use std::sync::Arc;
use ethcore::header::Header;
use light::client::LightChainClient;
use light::net::{Announcement, Error as NetError, Handler, EventContext, Capabilities, ReqId, Status};
use light::net::{
Announcement, Handler, BasicContext, EventContext,
Capabilities, ReqId, Status
};
use light::request;
use network::PeerId;
use rlp::{DecoderError, UntrustedRlp, View};
use util::{Bytes, U256, H256, Mutex, RwLock};
use self::sync_round::{SyncRound, ResponseContext};
mod response;
mod sync_round;
@ -49,7 +53,6 @@ struct ChainInfo {
}
struct Peer {
first_status: ChainInfo,
status: ChainInfo,
}
@ -57,17 +60,48 @@ impl Peer {
/// Create a peer object.
fn new(chain_info: ChainInfo) -> Self {
Peer {
first_status: chain_info.clone(),
status: chain_info.clone(),
}
}
}
// Search for a common ancestor.
struct AncestorSearch {
last_batched: u64,
req_id: ReqId,
}
// synchronization state machine.
enum SyncState {
// Idle (waiting for peers)
Idle,
// searching for common ancestor with best chain.
// queue should be cleared at this phase.
AncestorSearch(AncestorSearch),
// Doing sync rounds.
Rounds(SyncRound),
}
struct ResponseCtx<'a> {
peer: PeerId,
req_id: ReqId,
ctx: &'a BasicContext,
data: &'a [Bytes],
}
impl<'a> ResponseContext for ResponseCtx<'a> {
fn responder(&self) -> PeerId { self.peer }
fn req_id(&self) -> &ReqId { &self.req_id }
fn data(&self) -> &[Bytes] { self.data }
fn punish_responder(&self) { self.ctx.disable_peer(self.peer) }
}
/// Light client synchronization manager. See module docs for more details.
pub struct LightSync<L: LightChainClient> {
best_seen: Mutex<Option<(H256, U256)>>, // best seen block on the network.
peers: RwLock<HashMap<PeerId, Mutex<Peer>>>, // peers which are relevant to synchronization.
client: Arc<L>,
state: Mutex<SyncState>,
}
impl<L: LightChainClient> Handler for LightSync<L> {
@ -75,7 +109,8 @@ impl<L: LightChainClient> Handler for LightSync<L> {
let our_best = self.client.chain_info().best_block_number;
if !capabilities.serve_headers || status.head_num <= our_best {
trace!(target: "sync", "Ignoring irrelevant peer: {}", ctx.peer());
trace!(target: "sync", "Disconnecting irrelevant peer: {}", ctx.peer());
ctx.disconnect_peer(ctx.peer());
return;
}
@ -85,27 +120,130 @@ impl<L: LightChainClient> Handler for LightSync<L> {
head_num: status.head_num,
};
self.peers.write().insert(ctx.peer(), Mutex::new(Peer::new(chain_info)));
let mut best = self.best_seen.lock();
if best.as_ref().map_or(true, |b| status.head_td > b.1) {
*best = Some((status.head_hash, status.head_td));
}
fn on_disconnect(&self, ctx: &EventContext, _unfulfilled: &[ReqId]) {
self.peers.write().insert(ctx.peer(), Mutex::new(Peer::new(chain_info)));
self.maintain_sync(ctx.as_basic());
}
fn on_disconnect(&self, ctx: &EventContext, unfulfilled: &[ReqId]) {
let peer_id = ctx.peer();
let peer = match self.peers.write().remove(&peer_id).map(|p| p.into_inner()) {
Some(peer) => peer,
None => return,
};
let new_best = {
let mut best = self.best_seen.lock();
let peer_best = (peer.status.head_hash, peer.status.head_td);
if best.as_ref().map_or(false, |b| b == &peer_best) {
// search for next-best block.
let next_best: Option<(H256, U256)> = self.peers.read().values()
.map(|p| p.lock())
.map(|p| (p.status.head_hash, p.status.head_td))
.fold(None, |acc, x| match acc {
Some(acc) => if x.1 > acc.1 { Some(x) } else { Some(acc) },
None => Some(x),
});
*best = next_best;
}
best.clone()
};
if new_best.is_none() {
debug!(target: "sync", "No peers remain. Reverting to idle");
*self.state.lock() = SyncState::Idle;
} else {
let mut state = self.state.lock();
*state = match mem::replace(&mut *state, SyncState::Idle) {
SyncState::Idle => SyncState::Idle,
SyncState::AncestorSearch(search) => SyncState::AncestorSearch(search),
SyncState::Rounds(round) => SyncState::Rounds(round.requests_abandoned(unfulfilled)),
};
}
self.maintain_sync(ctx.as_basic());
}
fn on_announcement(&self, ctx: &EventContext, announcement: &Announcement) {
// restart search for common ancestor if necessary.
// restart download if necessary.
// if this is a peer we found irrelevant earlier, we may want to
// re-evaluate their usefulness.
if !self.peers.read().contains_key(&ctx.peer()) { return }
let last_td = {
let peers = self.peers.read();
match peers.get(&ctx.peer()){
None => return,
Some(peer) => {
let mut peer = peer.lock();
let last_td = peer.status.head_td;
peer.status = ChainInfo {
head_td: announcement.head_td,
head_hash: announcement.head_hash,
head_num: announcement.head_num,
};
last_td
}
}
};
trace!(target: "sync", "Announcement from peer {}: new chain head {:?}, reorg depth {}",
ctx.peer(), (announcement.head_hash, announcement.head_num), announcement.reorg_depth);
if last_td < announcement.head_td {
trace!(target: "sync", "Peer {} moved backwards.", ctx.peer());
self.peers.write().remove(&ctx.peer());
ctx.disconnect_peer(ctx.peer());
}
let mut best = self.best_seen.lock();
if best.as_ref().map_or(true, |b| announcement.head_td > b.1) {
*best = Some((announcement.head_hash, announcement.head_td));
}
self.maintain_sync(ctx.as_basic());
}
fn on_block_headers(&self, ctx: &EventContext, req_id: ReqId, headers: &[Bytes]) {
let peer_id = ctx.peer();
if !self.peers.read().contains_key(&ctx.peer()) {
return;
}
{
let mut state = self.state.lock();
*state = match mem::replace(&mut *state, SyncState::Idle) {
SyncState::Idle => SyncState::Idle,
SyncState::AncestorSearch(search) => SyncState::AncestorSearch(search),
SyncState::Rounds(round) => {
SyncState::Rounds(round.process_response(&ResponseCtx {
peer: ctx.peer(),
req_id: req_id,
ctx: ctx.as_basic(),
data: headers,
}))
}
};
}
self.maintain_sync(ctx.as_basic());
}
fn tick(&self, ctx: &BasicContext) {
self.maintain_sync(ctx);
}
}
// private helpers
impl<L: LightChainClient> LightSync<L> {
fn maintain_sync(&self, ctx: &BasicContext) {
const DRAIN_AMOUNT: usize = 256;
unimplemented!()
}
}
@ -120,6 +258,7 @@ impl<L: LightChainClient> LightSync<L> {
best_seen: Mutex::new(None),
peers: RwLock::new(HashMap::new()),
client: client,
state: Mutex::new(SyncState::Idle),
}
}
}

View File

@ -18,17 +18,14 @@
use std::cmp::Ordering;
use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
use std::mem;
use ethcore::header::Header;
use light::client::LightChainClient;
use light::net::{EventContext, ReqId};
use light::net::ReqId;
use light::request::Headers as HeadersRequest;
use network::PeerId;
use rlp::{UntrustedRlp, View};
use util::{Bytes, H256, Mutex};
use util::{Bytes, H256};
use super::response;
@ -47,7 +44,7 @@ pub trait ResponseContext {
/// Get the peer who sent this response.
fn responder(&self) -> PeerId;
/// Get the request ID this response corresponds to.
fn req_id(&self) -> ReqId;
fn req_id(&self) -> &ReqId;
/// Get the (unverified) response data.
fn data(&self) -> &[Bytes];
/// Punish the responder.
@ -173,7 +170,7 @@ impl Fetcher {
}
fn process_response<R: ResponseContext>(mut self, ctx: &R) -> SyncRound {
let mut request = match self.pending.remove(&ctx.req_id()) {
let mut request = match self.pending.remove(ctx.req_id()) {
Some(request) => request,
None => return SyncRound::Fetch(self),
};
@ -267,8 +264,8 @@ impl Fetcher {
SyncRound::Fetch(self)
}
fn drain(mut self, headers: &mut Vec<Header>, max: usize) -> SyncRound {
let max = ::std::cmp::min(max, self.ready.len());
fn drain(mut self, headers: &mut Vec<Header>, max: Option<usize>) -> SyncRound {
let max = ::std::cmp::min(max.unwrap_or(usize::max_value()), self.ready.len());
headers.extend(self.ready.drain(0..max));
if self.sparse.is_empty() && self.ready.is_empty() {
@ -321,7 +318,7 @@ impl RoundStart {
fn process_response<R: ResponseContext>(mut self, ctx: &R) -> SyncRound {
let req = match self.pending_req.take() {
Some((id, ref req)) if ctx.req_id() == id => { req.clone() }
Some((id, ref req)) if ctx.req_id() == &id => { req.clone() }
other => {
self.pending_req = other;
return SyncRound::Start(self);
@ -445,9 +442,9 @@ impl SyncRound {
}
}
/// Drain up to a maximum number of headers (continuous, starting with a child of
/// Drain up to a maximum number (None -> all) of headers (continuous, starting with a child of
/// the round start block) from the round, starting a new one once finished.
pub fn drain(self, v: &mut Vec<Header>, max: usize) -> Self {
pub fn drain(self, v: &mut Vec<Header>, max: Option<usize>) -> Self {
match self {
SyncRound::Fetch(fetcher) => fetcher.drain(v, max),
other => other,