Light protocol syncing improvements (#4212)

* remove old lint silencer

* dispatch requests only to peers with higher TD

* dynamic target for sync rounds

* use round pivots instead of frames, fix test

* fix total difficulty calculation for test client

* fix broken reorg algorithm

* fork test, fix ancestor search
This commit is contained in:
Robert Habermeier 2017-01-20 12:41:59 +01:00 committed by Arkadiy Paronyan
parent 3ff9324ec0
commit a791cb50a6
7 changed files with 254 additions and 80 deletions

View File

@ -144,17 +144,26 @@ impl HeaderChain {
// respective candidates vectors.
if self.best_block.read().total_difficulty < total_difficulty {
let mut canon_hash = hash;
for (_, entry) in candidates.iter_mut().rev().skip_while(|&(height, _)| *height > number) {
if entry.canonical_hash == canon_hash { break; }
for (&height, entry) in candidates.iter_mut().rev().skip_while(|&(height, _)| *height > number) {
if height != number && entry.canonical_hash == canon_hash { break; }
let canon = entry.candidates.iter().find(|x| x.hash == canon_hash)
trace!(target: "chain", "Setting new canonical block {} for block height {}",
canon_hash, height);
let canon_pos = entry.candidates.iter().position(|x| x.hash == canon_hash)
.expect("blocks are only inserted if parent is present; or this is the block we just added; qed");
// move the new canonical entry to the front and set the
// era's canonical hash.
entry.candidates.swap(0, canon_pos);
entry.canonical_hash = canon_hash;
// what about reorgs > cht::SIZE + HISTORY?
// resetting to the last block of a given CHT should be possible.
canon_hash = canon.parent_hash;
canon_hash = entry.candidates[0].parent_hash;
}
trace!(target: "chain", "New best block: ({}, {}), TD {}", number, hash, total_difficulty);
*self.best_block.write() = BlockDescriptor {
hash: hash,
number: number,
@ -360,6 +369,15 @@ mod tests {
}
}
assert_eq!(chain.best_block().number, 12);
let (mut num, mut canon_hash) = (chain.best_block().number, chain.best_block().hash);
assert_eq!(num, 12);
while num > 0 {
let header: Header = ::rlp::decode(&chain.get_header(BlockId::Number(num)).unwrap());
assert_eq!(header.hash(), canon_hash);
canon_hash = *header.parent_hash();
num -= 1;
}
}
}

View File

@ -144,7 +144,7 @@ impl TestBlockChainClient {
genesis_hash: H256::new(),
extra_data: extra_data,
last_hash: RwLock::new(H256::new()),
difficulty: RwLock::new(From::from(0)),
difficulty: RwLock::new(spec.genesis_header().difficulty().clone()),
balances: RwLock::new(HashMap::new()),
nonces: RwLock::new(HashMap::new()),
storage: RwLock::new(HashMap::new()),

View File

@ -177,7 +177,6 @@ pub mod headers {
}
/// A mode for verifying headers.
#[allow(dead_code)]
pub struct Headers;
impl Kind for Headers {

View File

@ -55,13 +55,25 @@ mod sync_round;
mod tests;
/// Peer chain info.
#[derive(Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
struct ChainInfo {
head_td: U256,
head_hash: H256,
head_num: u64,
}
impl PartialOrd for ChainInfo {
fn partial_cmp(&self, other: &Self) -> Option<::std::cmp::Ordering> {
self.head_td.partial_cmp(&other.head_td)
}
}
impl Ord for ChainInfo {
fn cmp(&self, other: &Self) -> ::std::cmp::Ordering {
self.head_td.cmp(&other.head_td)
}
}
struct Peer {
status: ChainInfo,
}
@ -74,6 +86,7 @@ impl Peer {
}
}
}
// search for a common ancestor with the best chain.
#[derive(Debug)]
enum AncestorSearch {
@ -107,13 +120,18 @@ impl AncestorSearch {
return AncestorSearch::FoundCommon(header.number(), header.hash());
}
if header.number() <= first_num {
if header.number() < first_num {
debug!(target: "sync", "Prehistoric common ancestor with best chain.");
return AncestorSearch::Prehistoric;
}
}
AncestorSearch::Queued(start - headers.len() as u64)
let probe = start - headers.len() as u64;
if probe == 0 {
AncestorSearch::Genesis
} else {
AncestorSearch::Queued(probe)
}
}
Err(e) => {
trace!(target: "sync", "Bad headers response from {}: {}", ctx.responder(), e);
@ -137,12 +155,13 @@ impl AncestorSearch {
match self {
AncestorSearch::Queued(start) => {
let batch_size = ::std::cmp::min(start as usize, BATCH_SIZE);
trace!(target: "sync", "Requesting {} reverse headers from {} to find common ancestor",
BATCH_SIZE, start);
batch_size, start);
let req = request::Headers {
start: start.into(),
max: ::std::cmp::min(start as usize, BATCH_SIZE),
max: batch_size,
skip: 0,
reverse: true,
};
@ -185,7 +204,7 @@ impl<'a> ResponseContext for ResponseCtx<'a> {
/// Light client synchronization manager. See module docs for more details.
pub struct LightSync<L: LightChainClient> {
best_seen: Mutex<Option<(H256, U256)>>, // best seen block on the network.
best_seen: Mutex<Option<ChainInfo>>, // best seen block on the network.
peers: RwLock<HashMap<PeerId, Mutex<Peer>>>, // peers which are relevant to synchronization.
client: Arc<L>,
rng: Mutex<OsRng>,
@ -194,9 +213,7 @@ pub struct LightSync<L: LightChainClient> {
impl<L: LightChainClient> Handler for LightSync<L> {
fn on_connect(&self, ctx: &EventContext, status: &Status, capabilities: &Capabilities) {
let our_best = self.client.chain_info().best_block_number;
if !capabilities.serve_headers || status.head_num <= our_best {
if !capabilities.serve_headers {
trace!(target: "sync", "Disconnecting irrelevant peer: {}", ctx.peer());
ctx.disconnect_peer(ctx.peer());
return;
@ -210,9 +227,7 @@ impl<L: LightChainClient> Handler for LightSync<L> {
{
let mut best = self.best_seen.lock();
if best.as_ref().map_or(true, |b| status.head_td > b.1) {
*best = Some((status.head_hash, status.head_td));
}
*best = ::std::cmp::max(best.clone(), Some(chain_info.clone()));
}
self.peers.write().insert(ctx.peer(), Mutex::new(Peer::new(chain_info)));
@ -231,17 +246,13 @@ impl<L: LightChainClient> Handler for LightSync<L> {
let new_best = {
let mut best = self.best_seen.lock();
let peer_best = (peer.status.head_hash, peer.status.head_td);
if best.as_ref().map_or(false, |b| b == &peer_best) {
if best.as_ref().map_or(false, |b| b == &peer.status) {
// search for next-best block.
let next_best: Option<(H256, U256)> = self.peers.read().values()
.map(|p| p.lock())
.map(|p| (p.status.head_hash, p.status.head_td))
.fold(None, |acc, x| match acc {
Some(acc) => if x.1 > acc.1 { Some(x) } else { Some(acc) },
None => Some(x),
});
let next_best: Option<ChainInfo> = self.peers.read().values()
.map(|p| p.lock().status.clone())
.map(Some)
.fold(None, ::std::cmp::max);
*best = next_best;
}
@ -266,7 +277,7 @@ impl<L: LightChainClient> Handler for LightSync<L> {
}
fn on_announcement(&self, ctx: &EventContext, announcement: &Announcement) {
let last_td = {
let (last_td, chain_info) = {
let peers = self.peers.read();
match peers.get(&ctx.peer()) {
None => return,
@ -278,7 +289,7 @@ impl<L: LightChainClient> Handler for LightSync<L> {
head_hash: announcement.head_hash,
head_num: announcement.head_num,
};
last_td
(last_td, peer.status.clone())
}
}
};
@ -290,13 +301,12 @@ impl<L: LightChainClient> Handler for LightSync<L> {
trace!(target: "sync", "Peer {} moved backwards.", ctx.peer());
self.peers.write().remove(&ctx.peer());
ctx.disconnect_peer(ctx.peer());
return
}
{
let mut best = self.best_seen.lock();
if best.as_ref().map_or(true, |b| announcement.head_td > b.1) {
*best = Some((announcement.head_hash, announcement.head_td));
}
*best = ::std::cmp::max(best.clone(), Some(chain_info));
}
self.maintain_sync(ctx.as_basic());
@ -352,10 +362,12 @@ impl<L: LightChainClient> LightSync<L> {
*state = SyncState::AncestorSearch(AncestorSearch::begin(chain_info.best_block_number));
}
// handles request dispatch, block import, and state machine transitions.
fn maintain_sync(&self, ctx: &BasicContext) {
const DRAIN_AMOUNT: usize = 128;
let mut state = self.state.lock();
let chain_info = self.client.chain_info();
debug!(target: "sync", "Maintaining sync ({:?})", &*state);
// drain any pending blocks into the queue.
@ -364,8 +376,7 @@ impl<L: LightChainClient> LightSync<L> {
'a:
loop {
let queue_info = self.client.queue_info();
if queue_info.is_full() { break }
if self.client.queue_info().is_full() { break }
*state = match mem::replace(&mut *state, SyncState::Idle) {
SyncState::Rounds(round)
@ -389,12 +400,23 @@ impl<L: LightChainClient> LightSync<L> {
// handle state transitions.
{
let chain_info = self.client.chain_info();
let best_td = chain_info.total_difficulty;
let best_td = chain_info.pending_total_difficulty;
let sync_target = match *self.best_seen.lock() {
Some(ref target) if target.head_td > best_td => (target.head_num, target.head_hash),
_ => {
trace!(target: "sync", "No target to sync to.");
*state = SyncState::Idle;
return;
}
};
match mem::replace(&mut *state, SyncState::Idle) {
_ if self.best_seen.lock().as_ref().map_or(true, |&(_, td)| best_td >= td)
=> *state = SyncState::Idle,
SyncState::Rounds(SyncRound::Abort(reason, _)) => {
SyncState::Rounds(SyncRound::Abort(reason, remaining)) => {
if remaining.len() > 0 {
*state = SyncState::Rounds(SyncRound::Abort(reason, remaining));
return;
}
match reason {
AbortReason::BadScaffold(bad_peers) => {
debug!(target: "sync", "Disabling peers responsible for bad scaffold");
@ -403,20 +425,23 @@ impl<L: LightChainClient> LightSync<L> {
}
}
AbortReason::NoResponses => {}
AbortReason::TargetReached => {
debug!(target: "sync", "Sync target reached. Going idle");
*state = SyncState::Idle;
return;
}
}
debug!(target: "sync", "Beginning search after aborted sync round");
self.begin_search(&mut state);
}
SyncState::AncestorSearch(AncestorSearch::FoundCommon(num, hash)) => {
// TODO: compare to best block and switch to another downloading
// method when close.
*state = SyncState::Rounds(SyncRound::begin(num, hash));
*state = SyncState::Rounds(SyncRound::begin((num, hash), sync_target));
}
SyncState::AncestorSearch(AncestorSearch::Genesis) => {
// Same here.
let g_hash = chain_info.genesis_hash;
*state = SyncState::Rounds(SyncRound::begin(0, g_hash));
*state = SyncState::Rounds(SyncRound::begin((0, g_hash), sync_target));
}
SyncState::Idle => self.begin_search(&mut state),
other => *state = other, // restore displaced state.
@ -424,11 +449,15 @@ impl<L: LightChainClient> LightSync<L> {
}
// allow dispatching of requests.
// TODO: maybe wait until the amount of cumulative requests remaining is high enough
// to avoid pumping the failure rate.
{
let peers = self.peers.read();
let mut peer_ids: Vec<_> = peers.keys().cloned().collect();
let mut peer_ids: Vec<_> = peers.iter().filter_map(|(id, p)| {
if p.lock().status.head_td > chain_info.pending_total_difficulty {
Some(*id)
} else {
None
}
}).collect();
let mut rng = self.rng.lock();
// naive request dispatcher: just give to any peer which says it will

View File

@ -30,13 +30,6 @@ use util::{Bytes, H256};
use super::response;
/// amount of blocks between each scaffold entry.
// TODO: move these into parameters for `RoundStart::new`?
pub const ROUND_SKIP: u64 = 255;
// amount of scaffold frames: these are the blank spaces in "X___X___X"
const ROUND_FRAMES: usize = 255;
// number of attempts to make to get a full scaffold for a sync round.
const SCAFFOLD_ATTEMPTS: usize = 3;
@ -59,6 +52,8 @@ pub enum AbortReason {
BadScaffold(Vec<PeerId>),
/// No incoming data.
NoResponses,
/// Sync rounds completed.
TargetReached,
}
// A request for headers with a known starting header hash.
@ -96,6 +91,7 @@ pub struct Fetcher {
scaffold_contributors: Vec<PeerId>,
ready: VecDeque<Header>,
end: (u64, H256),
target: (u64, H256),
}
impl Fetcher {
@ -103,7 +99,7 @@ impl Fetcher {
// with a list of peers who helped produce the chain.
// The headers must be valid RLP at this point and must have a consistent
// non-zero gap between them. Will abort the round if found wrong.
fn new(sparse_headers: Vec<Header>, contributors: Vec<PeerId>) -> SyncRound {
fn new(sparse_headers: Vec<Header>, contributors: Vec<PeerId>, target: (u64, H256)) -> SyncRound {
let mut requests = BinaryHeap::with_capacity(sparse_headers.len() - 1);
for pair in sparse_headers.windows(2) {
@ -144,6 +140,7 @@ impl Fetcher {
scaffold_contributors: contributors,
ready: VecDeque::new(),
end: end,
target: target,
})
}
@ -170,6 +167,8 @@ impl Fetcher {
if self.sparse.len() == 1 {
self.ready.push_back(self.sparse.pop_back().expect("sparse known to have one entry; qed"))
}
trace!(target: "sync", "{} headers ready to drain", self.ready.len());
}
fn process_response<R: ResponseContext>(mut self, ctx: &R) -> SyncRound {
@ -178,11 +177,16 @@ impl Fetcher {
None => return SyncRound::Fetch(self),
};
trace!(target: "sync", "Received response for subchain ({} -> {})",
request.subchain_parent.0 + 1, request.subchain_end.0);
let headers = ctx.data();
if headers.len() == 0 {
trace!(target: "sync", "Punishing peer {} for empty response", ctx.responder());
ctx.punish_responder();
self.requests.push(request);
return SyncRound::Fetch(self);
}
@ -274,32 +278,66 @@ impl Fetcher {
if self.sparse.is_empty() && self.ready.is_empty() {
trace!(target: "sync", "sync round complete. Starting anew from {:?}", self.end);
SyncRound::Start(RoundStart::new(self.end))
SyncRound::begin(self.end, self.target)
} else {
SyncRound::Fetch(self)
}
}
}
// Compute scaffold parameters from non-zero distance between start and target block: (skip, pivots).
fn scaffold_params(diff: u64) -> (u64, usize) {
// default parameters.
// amount of blocks between each scaffold pivot.
const ROUND_SKIP: u64 = 255;
// amount of scaffold pivots: these are the Xs in "X___X___X"
const ROUND_PIVOTS: usize = 256;
let rem = diff % (ROUND_SKIP + 1);
if diff <= ROUND_SKIP {
// just request headers from the start to the target.
(0, rem as usize)
} else {
// the number of pivots necessary to exactly hit or overshoot the target.
let pivots_to_target = (diff / (ROUND_SKIP + 1)) + if rem == 0 { 0 } else { 1 };
let num_pivots = ::std::cmp::min(pivots_to_target, ROUND_PIVOTS as u64) as usize;
(ROUND_SKIP, num_pivots)
}
}
/// Round started: get stepped header chain.
/// from a start block with number X we request 256 headers stepped by 256 from
/// block X + 1.
/// from a start block with number X we request ROUND_PIVOTS headers stepped by ROUND_SKIP from
/// block X + 1 to a target >= X + 1.
/// If the sync target is within ROUND_SKIP of the start, we request
/// only those blocks. If the sync target is within (ROUND_SKIP + 1) * (ROUND_PIVOTS - 1) of
/// the start, we reduce the number of pivots so the target is outside it.
pub struct RoundStart {
start_block: (u64, H256),
target: (u64, H256),
pending_req: Option<(ReqId, HeadersRequest)>,
sparse_headers: Vec<Header>,
contributors: HashSet<PeerId>,
attempt: usize,
skip: u64,
pivots: usize,
}
impl RoundStart {
fn new(start: (u64, H256)) -> Self {
fn new(start: (u64, H256), target: (u64, H256)) -> Self {
let (skip, pivots) = scaffold_params(target.0 - start.0);
trace!(target: "sync", "Beginning sync round: {} pivots and {} skip from block {}",
pivots, skip, start.0);
RoundStart {
start_block: start.clone(),
start_block: start,
target: target,
pending_req: None,
sparse_headers: Vec::new(),
contributors: HashSet::new(),
attempt: 0,
skip: skip,
pivots: pivots,
}
}
@ -309,10 +347,16 @@ impl RoundStart {
self.attempt += 1;
if self.attempt >= SCAFFOLD_ATTEMPTS {
if self.sparse_headers.len() > 1 {
Fetcher::new(self.sparse_headers, self.contributors.into_iter().collect())
return if self.sparse_headers.len() > 1 {
Fetcher::new(self.sparse_headers, self.contributors.into_iter().collect(), self.target)
} else {
SyncRound::Abort(AbortReason::NoResponses, self.sparse_headers.into())
let fetched_headers = if self.skip == 0 {
self.sparse_headers.into()
} else {
VecDeque::new()
};
SyncRound::abort(AbortReason::NoResponses, fetched_headers)
}
} else {
SyncRound::Start(self)
@ -339,11 +383,18 @@ impl RoundStart {
self.contributors.insert(ctx.responder());
self.sparse_headers.extend(headers);
if self.sparse_headers.len() == ROUND_FRAMES + 1 {
if self.sparse_headers.len() == self.pivots {
return if self.skip == 0 {
SyncRound::abort(AbortReason::TargetReached, self.sparse_headers.into())
} else {
trace!(target: "sync", "Beginning fetch of blocks between {} sparse headers",
self.sparse_headers.len());
return Fetcher::new(self.sparse_headers, self.contributors.into_iter().collect());
Fetcher::new(
self.sparse_headers,
self.contributors.into_iter().collect(),
self.target
)
}
}
}
Err(e) => {
@ -376,20 +427,20 @@ impl RoundStart {
if self.pending_req.is_none() {
// beginning offset + first block expected after last header we have.
let start = (self.start_block.0 + 1)
+ self.sparse_headers.len() as u64 * (ROUND_SKIP + 1);
+ self.sparse_headers.len() as u64 * (self.skip + 1);
let max = (ROUND_FRAMES - 1) - self.sparse_headers.len();
let max = self.pivots - self.sparse_headers.len();
let headers_request = HeadersRequest {
start: start.into(),
max: max,
skip: ROUND_SKIP,
skip: self.skip,
reverse: false,
};
if let Some(req_id) = dispatcher(headers_request.clone()) {
trace!(target: "sync", "Requesting scaffold: {} headers forward from {}, skip={}",
max, start, ROUND_SKIP);
max, start, self.skip);
self.pending_req = Some((req_id, headers_request));
}
@ -411,14 +462,18 @@ pub enum SyncRound {
impl SyncRound {
fn abort(reason: AbortReason, remaining: VecDeque<Header>) -> Self {
trace!(target: "sync", "Aborting sync round: {:?}. To drain: {:?}", reason, remaining);
trace!(target: "sync", "Aborting sync round: {:?}. To drain: {}", reason, remaining.len());
SyncRound::Abort(reason, remaining)
}
/// Begin sync rounds from a starting block.
pub fn begin(num: u64, hash: H256) -> Self {
SyncRound::Start(RoundStart::new((num, hash)))
/// Begin sync rounds from a starting block, but not to go past a given target
pub fn begin(start: (u64, H256), target: (u64, H256)) -> Self {
if target.0 <= start.0 {
SyncRound::abort(AbortReason::TargetReached, VecDeque::new())
} else {
SyncRound::Start(RoundStart::new(start, target))
}
}
/// Process an answer to a request. Unknown requests will be ignored.
@ -478,3 +533,21 @@ impl fmt::Debug for SyncRound {
}
}
}
#[cfg(test)]
mod tests {
use super::scaffold_params;
#[test]
fn scaffold_config() {
// within a certain distance of the head, we download
// sequentially.
assert_eq!(scaffold_params(1), (0, 1));
assert_eq!(scaffold_params(6), (0, 6));
// when scaffolds are useful, download enough frames to get
// within a close distance of the goal.
assert_eq!(scaffold_params(1000), (255, 4));
assert_eq!(scaffold_params(1024), (255, 4));
}
}

View File

@ -14,6 +14,50 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
#![allow(dead_code)]
use tests::helpers::TestNet;
use ethcore::client::{BlockChainClient, BlockId, EachBlockWith};
mod test_net;
#[test]
fn basic_sync() {
let mut net = TestNet::light(1, 2);
net.peer(1).chain().add_blocks(5000, EachBlockWith::Nothing);
net.peer(2).chain().add_blocks(6000, EachBlockWith::Nothing);
net.sync();
assert!(net.peer(0).light_chain().get_header(BlockId::Number(6000)).is_some());
}
#[test]
fn fork_post_cht() {
const CHAIN_LENGTH: u64 = 50; // shouldn't be longer than ::light::cht::size();
let mut net = TestNet::light(1, 2);
// peer 2 is on a higher TD chain.
net.peer(1).chain().add_blocks(CHAIN_LENGTH as usize, EachBlockWith::Nothing);
net.peer(2).chain().add_blocks(CHAIN_LENGTH as usize + 1, EachBlockWith::Uncle);
// get the light peer on peer 1's chain.
for id in (0..CHAIN_LENGTH).map(|x| x + 1).map(BlockId::Number) {
let (light_peer, full_peer) = (net.peer(0), net.peer(1));
let light_chain = light_peer.light_chain();
let header = full_peer.chain().block_header(id).unwrap().decode();
let _ = light_chain.import_header(header);
light_chain.flush_queue();
light_chain.import_verified();
assert!(light_chain.get_header(id).is_some());
}
net.sync();
for id in (0..CHAIN_LENGTH).map(|x| x + 1).map(BlockId::Number) {
assert_eq!(
net.peer(0).light_chain().get_header(id),
net.peer(2).chain().block_header(id).map(|h| h.into_inner())
);
}
}

View File

@ -174,13 +174,24 @@ impl PeerLike for Peer {
}
fn is_done(&self) -> bool {
self.queue.read().is_empty()
self.queue.read().is_empty() && match self.data {
PeerData::Light(_, ref client) => {
// should create a test light client which just imports
// headers directly and doesn't have a queue to drain.
client.import_verified();
client.queue_info().is_empty()
}
_ => true,
}
}
fn sync_step(&self) {
if let PeerData::Light(_, ref client) = self.data {
client.flush_queue();
client.import_verified();
while !client.queue_info().is_empty() {
client.import_verified()
}
}
}