skeleton for maintain_sync: all but dispatch

This commit is contained in:
Robert Habermeier 2016-12-15 22:42:24 +01:00
parent 9c7340307e
commit 8622ab66dc

View File

@ -38,8 +38,9 @@ use light::net::{
use light::request; use light::request;
use network::PeerId; use network::PeerId;
use util::{Bytes, U256, H256, Mutex, RwLock}; use util::{Bytes, U256, H256, Mutex, RwLock};
use rand::{Rng, OsRng};
use self::sync_round::{SyncRound, ResponseContext}; use self::sync_round::{AbortReason, SyncRound, ResponseContext};
mod response; mod response;
mod sync_round; mod sync_round;
@ -65,10 +66,10 @@ impl Peer {
} }
} }
// Search for a common ancestor. // Search for a common ancestor with the best chain.
struct AncestorSearch { struct AncestorSearch {
last_batched: u64, last_batched: u64,
req_id: ReqId, req_id: Option<ReqId>,
} }
// synchronization state machine. // synchronization state machine.
@ -101,6 +102,7 @@ pub struct LightSync<L: LightChainClient> {
best_seen: Mutex<Option<(H256, U256)>>, // best seen block on the network. best_seen: Mutex<Option<(H256, U256)>>, // best seen block on the network.
peers: RwLock<HashMap<PeerId, Mutex<Peer>>>, // peers which are relevant to synchronization. peers: RwLock<HashMap<PeerId, Mutex<Peer>>>, // peers which are relevant to synchronization.
client: Arc<L>, client: Arc<L>,
rng: OsRng,
state: Mutex<SyncState>, state: Mutex<SyncState>,
} }
@ -240,10 +242,76 @@ impl<L: LightChainClient> Handler for LightSync<L> {
// private helpers // private helpers
impl<L: LightChainClient> LightSync<L> { impl<L: LightChainClient> LightSync<L> {
fn maintain_sync(&self, ctx: &BasicContext) { // Begins a search for the common ancestor and our best block.
const DRAIN_AMOUNT: usize = 256; // does not lock state, instead has a mutable reference to it passed.
fn begin_search(&self, _state: &mut SyncState) {
self.client.clear_queue();
unimplemented!() unimplemented!();
}
fn maintain_sync(&self, ctx: &BasicContext) {
const DRAIN_AMOUNT: usize = 128;
let mut state = self.state.lock();
// drain any pending blocks into the queue.
{
let mut sink = Vec::with_capacity(DRAIN_AMOUNT);
'a:
loop {
let queue_info = self.client.queue_info();
if queue_info.is_full() { break }
*state = match mem::replace(&mut *state, SyncState::Idle) {
SyncState::Rounds(round)
=> SyncState::Rounds(round.drain(&mut sink, Some(DRAIN_AMOUNT))),
other => other,
};
if sink.is_empty() { break }
for header in sink.drain(..) {
if let Err(e) = self.client.queue_header(header) {
debug!(target: "sync", "Found bad header ({:?}). Reset to search state.", e);
self.begin_search(&mut state);
break 'a;
}
}
}
}
// check for aborted sync round.
{
match mem::replace(&mut *state, SyncState::Idle) {
SyncState::Rounds(SyncRound::Abort(reason)) => {
match reason {
AbortReason::BadScaffold(bad_peers) => {
debug!(target: "sync", "Disabling peers responsible for bad scaffold");
for peer in bad_peers {
ctx.disable_peer(peer);
}
}
AbortReason::NoResponses => {}
}
debug!(target: "sync", "Beginning search after aborted sync round");
self.begin_search(&mut state);
}
other => *state = other, // restore displaced state.
}
}
// allow dispatching of requests.
{
*state = match mem::replace(&mut *state, SyncState::Idle) {
SyncState::Rounds(round)
=> SyncState::Rounds(round.dispatch_requests(|_| unimplemented!())),
other => other,
};
}
} }
} }
@ -253,12 +321,13 @@ impl<L: LightChainClient> LightSync<L> {
/// ///
/// This won't do anything until registered as a handler /// This won't do anything until registered as a handler
/// so it can act on events. /// so it can act on events.
pub fn new(client: Arc<L>) -> Self { pub fn new(client: Arc<L>) -> Result<Self, ::std::io::Error> {
LightSync { Ok(LightSync {
best_seen: Mutex::new(None), best_seen: Mutex::new(None),
peers: RwLock::new(HashMap::new()), peers: RwLock::new(HashMap::new()),
client: client, client: client,
rng: try!(OsRng::new()),
state: Mutex::new(SyncState::Idle), state: Mutex::new(SyncState::Idle),
} })
} }
} }