naive and bad request dispatcher

This commit is contained in:
Robert Habermeier 2016-12-16 15:26:39 +01:00
parent 0d7b638a2d
commit ce84215d93
2 changed files with 60 additions and 15 deletions

View File

@ -28,8 +28,6 @@ use std::collections::HashMap;
use std::mem; use std::mem;
use std::sync::Arc; use std::sync::Arc;
use ethcore::header::Header;
use light::client::LightChainClient; use light::client::LightChainClient;
use light::net::{ use light::net::{
Announcement, Handler, BasicContext, EventContext, Announcement, Handler, BasicContext, EventContext,
@ -58,10 +56,10 @@ struct Peer {
} }
impl Peer { impl Peer {
/// Create a peer object. // Create a new peer.
fn new(chain_info: ChainInfo) -> Self { fn new(chain_info: ChainInfo) -> Self {
Peer { Peer {
status: chain_info.clone(), status: chain_info,
} }
} }
} }
@ -79,11 +77,11 @@ impl AncestorSearch {
fn begin(best_num: u64) -> Self { fn begin(best_num: u64) -> Self {
match best_num { match best_num {
0 => AncestorSearch::Genesis, 0 => AncestorSearch::Genesis,
x => AncestorSearch::Queued(best_num), _ => AncestorSearch::Queued(best_num),
} }
} }
fn process_response<L>(mut self, ctx: &ResponseContext, client: &L) -> AncestorSearch fn process_response<L>(self, ctx: &ResponseContext, client: &L) -> AncestorSearch
where L: LightChainClient where L: LightChainClient
{ {
let first_num = client.chain_info().first_block_number.unwrap_or(0); let first_num = client.chain_info().first_block_number.unwrap_or(0);
@ -120,6 +118,29 @@ impl AncestorSearch {
other => other, other => other,
} }
} }
fn dispatch_request<F>(self, mut dispatcher: F) -> AncestorSearch
where F: FnMut(request::Headers) -> Option<ReqId>
{
const BATCH_SIZE: usize = 64;
match self {
AncestorSearch::Queued(start) => {
let req = request::Headers {
start: start.into(),
max: ::std::cmp::min(start as usize, BATCH_SIZE),
skip: 0,
reverse: true,
};
match dispatcher(req.clone()) {
Some(req_id) => AncestorSearch::Awaiting(req_id, start, req),
None => AncestorSearch::Queued(start),
}
}
other => other,
}
}
} }
// synchronization state machine. // synchronization state machine.
@ -152,7 +173,7 @@ pub struct LightSync<L: LightChainClient> {
best_seen: Mutex<Option<(H256, U256)>>, // best seen block on the network. best_seen: Mutex<Option<(H256, U256)>>, // best seen block on the network.
peers: RwLock<HashMap<PeerId, Mutex<Peer>>>, // peers which are relevant to synchronization. peers: RwLock<HashMap<PeerId, Mutex<Peer>>>, // peers which are relevant to synchronization.
client: Arc<L>, client: Arc<L>,
rng: OsRng, rng: Mutex<OsRng>,
state: Mutex<SyncState>, state: Mutex<SyncState>,
} }
@ -377,9 +398,33 @@ impl<L: LightChainClient> LightSync<L> {
// TODO: maybe wait until the amount of cumulative requests remaining is high enough // TODO: maybe wait until the amount of cumulative requests remaining is high enough
// to avoid pumping the failure rate. // to avoid pumping the failure rate.
{ {
let peers = self.peers.read();
let mut peer_ids: Vec<_> = peers.keys().cloned().collect();
let mut rng = self.rng.lock();
// naive request dispatcher: just give to any peer which says it will
// give us responses.
let dispatcher = move |req: request::Headers| {
rng.shuffle(&mut peer_ids);
for peer in &peer_ids {
if ctx.max_requests(*peer, request::Kind::Headers) >= req.max {
match ctx.request_from(*peer, request::Request::Headers(req.clone())) {
Ok(id) => return Some(id),
Err(e) =>
trace!(target: "sync", "Error requesting headers from viable peer: {}", e),
}
}
}
None
};
*state = match mem::replace(&mut *state, SyncState::Idle) { *state = match mem::replace(&mut *state, SyncState::Idle) {
SyncState::Rounds(round) SyncState::Rounds(round) =>
=> SyncState::Rounds(round.dispatch_requests(|_| unimplemented!())), SyncState::Rounds(round.dispatch_requests(dispatcher)),
SyncState::AncestorSearch(search) =>
SyncState::AncestorSearch(search.dispatch_request(dispatcher)),
other => other, other => other,
}; };
} }
@ -397,7 +442,7 @@ impl<L: LightChainClient> LightSync<L> {
best_seen: Mutex::new(None), best_seen: Mutex::new(None),
peers: RwLock::new(HashMap::new()), peers: RwLock::new(HashMap::new()),
client: client, client: client,
rng: try!(OsRng::new()), rng: Mutex::new(try!(OsRng::new())),
state: Mutex::new(SyncState::Idle), state: Mutex::new(SyncState::Idle),
}) })
} }

View File

@ -243,8 +243,8 @@ impl Fetcher {
SyncRound::Fetch(self) SyncRound::Fetch(self)
} }
fn dispatch_requests<D>(mut self, dispatcher: D) -> SyncRound fn dispatch_requests<D>(mut self, mut dispatcher: D) -> SyncRound
where D: Fn(HeadersRequest) -> Option<ReqId> where D: FnMut(HeadersRequest) -> Option<ReqId>
{ {
while let Some(pending_req) = self.requests.pop() { while let Some(pending_req) = self.requests.pop() {
match dispatcher(pending_req.headers_request.clone()) { match dispatcher(pending_req.headers_request.clone()) {
@ -365,8 +365,8 @@ impl RoundStart {
} }
} }
fn dispatch_requests<D>(mut self, dispatcher: D) -> SyncRound fn dispatch_requests<D>(mut self, mut dispatcher: D) -> SyncRound
where D: Fn(HeadersRequest) -> Option<ReqId> where D: FnMut(HeadersRequest) -> Option<ReqId>
{ {
if self.pending_req.is_none() { if self.pending_req.is_none() {
// beginning offset + first block expected after last header we have. // beginning offset + first block expected after last header we have.
@ -434,7 +434,7 @@ impl SyncRound {
// TODO: have dispatcher take capabilities argument? and return an error as // TODO: have dispatcher take capabilities argument? and return an error as
// to why no suitable peer can be found? (no buffer, no chain heads that high, etc) // to why no suitable peer can be found? (no buffer, no chain heads that high, etc)
pub fn dispatch_requests<D>(self, dispatcher: D) -> Self pub fn dispatch_requests<D>(self, dispatcher: D) -> Self
where D: Fn(HeadersRequest) -> Option<ReqId> where D: FnMut(HeadersRequest) -> Option<ReqId>
{ {
match self { match self {
SyncRound::Start(round_start) => round_start.dispatch_requests(dispatcher), SyncRound::Start(round_start) => round_start.dispatch_requests(dispatcher),