From ce84215d93330b9dc448b698471c1c4633e3ae47 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Fri, 16 Dec 2016 15:26:39 +0100 Subject: [PATCH] naive and bad request dispatcher --- sync/src/light_sync/mod.rs | 65 ++++++++++++++++++++++++++----- sync/src/light_sync/sync_round.rs | 10 ++--- 2 files changed, 60 insertions(+), 15 deletions(-) diff --git a/sync/src/light_sync/mod.rs b/sync/src/light_sync/mod.rs index 3ab2ffe82..5ca441949 100644 --- a/sync/src/light_sync/mod.rs +++ b/sync/src/light_sync/mod.rs @@ -28,8 +28,6 @@ use std::collections::HashMap; use std::mem; use std::sync::Arc; -use ethcore::header::Header; - use light::client::LightChainClient; use light::net::{ Announcement, Handler, BasicContext, EventContext, @@ -58,10 +56,10 @@ struct Peer { } impl Peer { - /// Create a peer object. + // Create a new peer. fn new(chain_info: ChainInfo) -> Self { Peer { - status: chain_info.clone(), + status: chain_info, } } } @@ -79,11 +77,11 @@ impl AncestorSearch { fn begin(best_num: u64) -> Self { match best_num { 0 => AncestorSearch::Genesis, - x => AncestorSearch::Queued(best_num), + _ => AncestorSearch::Queued(best_num), } } - fn process_response(mut self, ctx: &ResponseContext, client: &L) -> AncestorSearch + fn process_response(self, ctx: &ResponseContext, client: &L) -> AncestorSearch where L: LightChainClient { let first_num = client.chain_info().first_block_number.unwrap_or(0); @@ -120,6 +118,29 @@ impl AncestorSearch { other => other, } } + + fn dispatch_request(self, mut dispatcher: F) -> AncestorSearch + where F: FnMut(request::Headers) -> Option + { + const BATCH_SIZE: usize = 64; + + match self { + AncestorSearch::Queued(start) => { + let req = request::Headers { + start: start.into(), + max: ::std::cmp::min(start as usize, BATCH_SIZE), + skip: 0, + reverse: true, + }; + + match dispatcher(req.clone()) { + Some(req_id) => AncestorSearch::Awaiting(req_id, start, req), + None => AncestorSearch::Queued(start), + } + } + other => other, + } + } } // synchronization state machine. @@ -152,7 +173,7 @@ pub struct LightSync { best_seen: Mutex>, // best seen block on the network. peers: RwLock>>, // peers which are relevant to synchronization. client: Arc, - rng: OsRng, + rng: Mutex, state: Mutex, } @@ -377,9 +398,33 @@ impl LightSync { // TODO: maybe wait until the amount of cumulative requests remaining is high enough // to avoid pumping the failure rate. { + let peers = self.peers.read(); + let mut peer_ids: Vec<_> = peers.keys().cloned().collect(); + let mut rng = self.rng.lock(); + + // naive request dispatcher: just give to any peer which says it will + // give us responses. + let dispatcher = move |req: request::Headers| { + rng.shuffle(&mut peer_ids); + + for peer in &peer_ids { + if ctx.max_requests(*peer, request::Kind::Headers) >= req.max { + match ctx.request_from(*peer, request::Request::Headers(req.clone())) { + Ok(id) => return Some(id), + Err(e) => + trace!(target: "sync", "Error requesting headers from viable peer: {}", e), + } + } + } + + None + }; + *state = match mem::replace(&mut *state, SyncState::Idle) { - SyncState::Rounds(round) - => SyncState::Rounds(round.dispatch_requests(|_| unimplemented!())), + SyncState::Rounds(round) => + SyncState::Rounds(round.dispatch_requests(dispatcher)), + SyncState::AncestorSearch(search) => + SyncState::AncestorSearch(search.dispatch_request(dispatcher)), other => other, }; } @@ -397,7 +442,7 @@ impl LightSync { best_seen: Mutex::new(None), peers: RwLock::new(HashMap::new()), client: client, - rng: try!(OsRng::new()), + rng: Mutex::new(try!(OsRng::new())), state: Mutex::new(SyncState::Idle), }) } diff --git a/sync/src/light_sync/sync_round.rs b/sync/src/light_sync/sync_round.rs index 7f86faf2d..03ca1dea4 100644 --- a/sync/src/light_sync/sync_round.rs +++ b/sync/src/light_sync/sync_round.rs @@ -243,8 +243,8 @@ impl Fetcher { SyncRound::Fetch(self) } - fn dispatch_requests(mut self, dispatcher: D) -> SyncRound - where D: Fn(HeadersRequest) -> Option + fn dispatch_requests(mut self, mut dispatcher: D) -> SyncRound + where D: FnMut(HeadersRequest) -> Option { while let Some(pending_req) = self.requests.pop() { match dispatcher(pending_req.headers_request.clone()) { @@ -365,8 +365,8 @@ impl RoundStart { } } - fn dispatch_requests(mut self, dispatcher: D) -> SyncRound - where D: Fn(HeadersRequest) -> Option + fn dispatch_requests(mut self, mut dispatcher: D) -> SyncRound + where D: FnMut(HeadersRequest) -> Option { if self.pending_req.is_none() { // beginning offset + first block expected after last header we have. @@ -434,7 +434,7 @@ impl SyncRound { // TODO: have dispatcher take capabilities argument? and return an error as // to why no suitable peer can be found? (no buffer, no chain heads that high, etc) pub fn dispatch_requests(self, dispatcher: D) -> Self - where D: Fn(HeadersRequest) -> Option + where D: FnMut(HeadersRequest) -> Option { match self { SyncRound::Start(round_start) => round_start.dispatch_requests(dispatcher),