Merge pull request #3892 from ethcore/lightsync

Naive light client synchronization
This commit is contained in:
Robert Habermeier 2017-01-05 13:17:14 +01:00 committed by GitHub
commit f1dd96ca5d
21 changed files with 2163 additions and 190 deletions

7
Cargo.lock generated
View File

@ -522,6 +522,7 @@ dependencies = [
"ethcore-util 1.5.0",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"rlp 0.1.0",
"smallvec 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -2000,6 +2001,11 @@ name = "smallvec"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "smallvec"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "solicit"
version = "0.4.4"
@ -2514,6 +2520,7 @@ dependencies = [
"checksum slab 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6dbdd334bd28d328dad1c41b0ea662517883d8880d8533895ef96c8003dec9c4"
"checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23"
"checksum smallvec 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "fcc8d19212aacecf95e4a7a2179b26f7aeb9732a915cf01f05b0d3e044865410"
"checksum smallvec 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3a3c84984c278afe61a46e19868e8b23e2ee3be5b3cc6dea6edad4893bc6c841"
"checksum solicit 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "172382bac9424588d7840732b250faeeef88942e37b6e35317dce98cafdd75b2"
"checksum spmc 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "93bdab61c1a413e591c4d17388ffa859eaff2df27f1e13a5ec8b716700605adf"
"checksum stable-heap 0.1.0 (git+https://github.com/carllerche/stable-heap?rev=3c5cd1ca47)" = "<none>"

View File

@ -19,6 +19,7 @@ ethcore-io = { path = "../../util/io" }
ethcore-ipc = { path = "../../ipc/rpc", optional = true }
rlp = { path = "../../util/rlp" }
time = "0.1"
smallvec = "0.3.1"
[features]
default = []

View File

@ -1,121 +0,0 @@
// Copyright 2015, 2016 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Light client implementation. Stores data from light sync
use std::sync::Arc;
use ethcore::engines::Engine;
use ethcore::ids::BlockId;
use ethcore::service::ClientIoMessage;
use ethcore::block_import_error::BlockImportError;
use ethcore::block_status::BlockStatus;
use ethcore::verification::queue::{HeaderQueue, QueueInfo};
use ethcore::transaction::{SignedTransaction, PendingTransaction};
use ethcore::blockchain_info::BlockChainInfo;
use ethcore::encoded;
use io::IoChannel;
use util::hash::{H256, H256FastMap};
use util::{Bytes, Mutex};
use provider::Provider;
use request;
/// Light client implementation.
pub struct Client {
_engine: Arc<Engine>,
header_queue: HeaderQueue,
_message_channel: Mutex<IoChannel<ClientIoMessage>>,
tx_pool: Mutex<H256FastMap<SignedTransaction>>,
}
impl Client {
/// Import a header as rlp-encoded bytes.
pub fn import_header(&self, bytes: Bytes) -> Result<H256, BlockImportError> {
let header = ::rlp::decode(&bytes);
self.header_queue.import(header).map_err(Into::into)
}
/// Whether the block is already known (but not necessarily part of the canonical chain)
pub fn is_known(&self, _id: BlockId) -> bool {
false
}
/// Import a local transaction.
pub fn import_own_transaction(&self, tx: SignedTransaction) {
self.tx_pool.lock().insert(tx.hash(), tx);
}
/// Fetch a vector of all pending transactions.
pub fn pending_transactions(&self) -> Vec<SignedTransaction> {
self.tx_pool.lock().values().cloned().collect()
}
/// Inquire about the status of a given block (or header).
pub fn status(&self, _id: BlockId) -> BlockStatus {
BlockStatus::Unknown
}
/// Get the header queue info.
pub fn queue_info(&self) -> QueueInfo {
self.header_queue.queue_info()
}
}
// dummy implementation -- may draw from canonical cache further on.
impl Provider for Client {
fn chain_info(&self) -> BlockChainInfo {
unimplemented!()
}
fn reorg_depth(&self, _a: &H256, _b: &H256) -> Option<u64> {
None
}
fn earliest_state(&self) -> Option<u64> {
None
}
fn block_header(&self, _id: BlockId) -> Option<encoded::Header> {
None
}
fn block_body(&self, _id: BlockId) -> Option<encoded::Body> {
None
}
fn block_receipts(&self, _hash: &H256) -> Option<Bytes> {
None
}
fn state_proof(&self, _req: request::StateProof) -> Vec<Bytes> {
Vec::new()
}
fn contract_code(&self, _req: request::ContractCode) -> Bytes {
Vec::new()
}
fn header_proof(&self, _req: request::HeaderProof) -> Option<(encoded::Header, Vec<Bytes>)> {
None
}
fn ready_transactions(&self) -> Vec<PendingTransaction> {
Vec::new()
}
}

View File

@ -0,0 +1,22 @@
// Copyright 2015, 2016 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//! Canonical hash trie definitions and helper functions.
/// The size of each CHT.
pub const SIZE: u64 = 2048;
/// Convert a block number to a CHT number.
pub fn block_to_cht_number(block_num: u64) -> u64 {
(block_num + 1) / SIZE
}

View File

@ -0,0 +1,365 @@
// Copyright 2015, 2016 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Light client header chain.
//!
//! Unlike a full node's `BlockChain` this doesn't store much in the database.
//! It stores candidates for the last 2048-4096 blocks as well as CHT roots for
//! historical blocks all the way to the genesis.
//!
//! This is separate from the `BlockChain` for two reasons:
//! - It stores only headers (and a pruned subset of them)
//! - To allow for flexibility in the database layout once that's incorporated.
// TODO: use DB instead of memory. DB Layout: just the contents of `candidates`/`headers`
//
use std::collections::{BTreeMap, HashMap};
use client::cht;
use ethcore::block_status::BlockStatus;
use ethcore::error::BlockError;
use ethcore::ids::BlockId;
use ethcore::views::HeaderView;
use util::{Bytes, H256, U256, HeapSizeOf, Mutex, RwLock};
use smallvec::SmallVec;
/// Store at least this many candidate headers at all times.
/// Also functions as the delay for computing CHTs as they aren't
/// relevant to any blocks we've got in memory.
const HISTORY: u64 = 2048;
/// Information about a block.
#[derive(Debug, Clone)]
pub struct BlockDescriptor {
/// The block's hash
pub hash: H256,
/// The block's number
pub number: u64,
/// The block's total difficulty.
pub total_difficulty: U256,
}
// candidate block description.
struct Candidate {
hash: H256,
parent_hash: H256,
total_difficulty: U256,
}
struct Entry {
candidates: SmallVec<[Candidate; 3]>, // 3 arbitrarily chosen
canonical_hash: H256,
}
impl HeapSizeOf for Entry {
fn heap_size_of_children(&self) -> usize {
match self.candidates.spilled() {
false => 0,
true => self.candidates.capacity() * ::std::mem::size_of::<Candidate>(),
}
}
}
/// Header chain. See module docs for more details.
pub struct HeaderChain {
genesis_header: Bytes, // special-case the genesis.
candidates: RwLock<BTreeMap<u64, Entry>>,
headers: RwLock<HashMap<H256, Bytes>>,
best_block: RwLock<BlockDescriptor>,
cht_roots: Mutex<Vec<H256>>,
}
impl HeaderChain {
/// Create a new header chain given this genesis block.
pub fn new(genesis: &[u8]) -> Self {
let g_view = HeaderView::new(genesis);
HeaderChain {
genesis_header: genesis.to_owned(),
best_block: RwLock::new(BlockDescriptor {
hash: g_view.hash(),
number: 0,
total_difficulty: g_view.difficulty(),
}),
candidates: RwLock::new(BTreeMap::new()),
headers: RwLock::new(HashMap::new()),
cht_roots: Mutex::new(Vec::new()),
}
}
/// Insert a pre-verified header.
///
/// This blindly trusts that the data given to it is
/// a) valid RLP encoding of a header and
/// b) has sensible data contained within it.
pub fn insert(&self, header: Bytes) -> Result<(), BlockError> {
let view = HeaderView::new(&header);
let hash = view.hash();
let number = view.number();
let parent_hash = view.parent_hash();
// hold candidates the whole time to guard import order.
let mut candidates = self.candidates.write();
// find parent details.
let parent_td =
if number == 1 {
let g_view = HeaderView::new(&self.genesis_header);
g_view.difficulty()
} else {
candidates.get(&(number - 1))
.and_then(|entry| entry.candidates.iter().find(|c| c.hash == parent_hash))
.map(|c| c.total_difficulty)
.ok_or_else(|| BlockError::UnknownParent(parent_hash))?
};
let total_difficulty = parent_td + view.difficulty();
// insert headers and candidates entries.
candidates.entry(number).or_insert_with(|| Entry { candidates: SmallVec::new(), canonical_hash: hash})
.candidates.push(Candidate {
hash: hash,
parent_hash: parent_hash,
total_difficulty: total_difficulty,
});
self.headers.write().insert(hash, header.clone());
// reorganize ancestors so canonical entries are first in their
// respective candidates vectors.
if self.best_block.read().total_difficulty < total_difficulty {
let mut canon_hash = hash;
for (_, entry) in candidates.iter_mut().rev().skip_while(|&(height, _)| *height > number) {
if entry.canonical_hash == canon_hash { break; }
let canon = entry.candidates.iter().find(|x| x.hash == canon_hash)
.expect("blocks are only inserted if parent is present; or this is the block we just added; qed");
// what about reorgs > cht::SIZE + HISTORY?
// resetting to the last block of a given CHT should be possible.
canon_hash = canon.parent_hash;
}
*self.best_block.write() = BlockDescriptor {
hash: hash,
number: number,
total_difficulty: total_difficulty,
};
// produce next CHT root if it's time.
let earliest_era = *candidates.keys().next().expect("at least one era just created; qed");
if earliest_era + HISTORY + cht::SIZE <= number {
let mut values = Vec::with_capacity(cht::SIZE as usize);
{
let mut headers = self.headers.write();
for i in (0..cht::SIZE).map(|x| x + earliest_era) {
let era_entry = candidates.remove(&i)
.expect("all eras are sequential with no gaps; qed");
for ancient in &era_entry.candidates {
headers.remove(&ancient.hash);
}
values.push((
::rlp::encode(&i).to_vec(),
::rlp::encode(&era_entry.canonical_hash).to_vec(),
));
}
}
let cht_root = ::util::triehash::trie_root(values);
debug!(target: "chain", "Produced CHT {} root: {:?}", (earliest_era - 1) % cht::SIZE, cht_root);
self.cht_roots.lock().push(cht_root);
}
}
Ok(())
}
/// Get a block header. In the case of query by number, only canonical blocks
/// will be returned.
pub fn get_header(&self, id: BlockId) -> Option<Bytes> {
match id {
BlockId::Earliest | BlockId::Number(0) => Some(self.genesis_header.clone()),
BlockId::Hash(hash) => self.headers.read().get(&hash).map(|x| x.to_vec()),
BlockId::Number(num) => {
if self.best_block.read().number < num { return None }
self.candidates.read().get(&num).map(|entry| entry.canonical_hash)
.and_then(|hash| self.headers.read().get(&hash).map(|x| x.to_vec()))
}
BlockId::Latest | BlockId::Pending => {
let hash = self.best_block.read().hash;
self.headers.read().get(&hash).map(|x| x.to_vec())
}
}
}
/// Get the nth CHT root, if it's been computed.
///
/// CHT root 0 is from block `1..2048`.
/// CHT root 1 is from block `2049..4096`
/// and so on.
///
/// This is because it's assumed that the genesis hash is known,
/// so including it within a CHT would be redundant.
pub fn cht_root(&self, n: usize) -> Option<H256> {
self.cht_roots.lock().get(n).map(|h| h.clone())
}
/// Get the genesis hash.
pub fn genesis_hash(&self) -> H256 {
::util::Hashable::sha3(&self.genesis_header)
}
/// Get the best block's data.
pub fn best_block(&self) -> BlockDescriptor {
self.best_block.read().clone()
}
/// If there is a gap between the genesis and the rest
/// of the stored blocks, return the first post-gap block.
pub fn first_block(&self) -> Option<BlockDescriptor> {
let candidates = self.candidates.read();
match candidates.iter().next() {
None | Some((&1, _)) => None,
Some((&height, entry)) => Some(BlockDescriptor {
number: height,
hash: entry.canonical_hash,
total_difficulty: entry.candidates.iter().find(|x| x.hash == entry.canonical_hash)
.expect("entry always stores canonical candidate; qed").total_difficulty,
})
}
}
/// Get block status.
pub fn status(&self, hash: &H256) -> BlockStatus {
match self.headers.read().contains_key(hash) {
true => BlockStatus::InChain,
false => BlockStatus::Unknown,
}
}
}
impl HeapSizeOf for HeaderChain {
fn heap_size_of_children(&self) -> usize {
self.candidates.read().heap_size_of_children() +
self.headers.read().heap_size_of_children() +
self.cht_roots.lock().heap_size_of_children()
}
}
#[cfg(test)]
mod tests {
use super::HeaderChain;
use ethcore::ids::BlockId;
use ethcore::header::Header;
use ethcore::spec::Spec;
#[test]
fn basic_chain() {
let spec = Spec::new_test();
let genesis_header = spec.genesis_header();
let chain = HeaderChain::new(&::rlp::encode(&genesis_header));
let mut parent_hash = genesis_header.hash();
let mut rolling_timestamp = genesis_header.timestamp();
for i in 1..10000 {
let mut header = Header::new();
header.set_parent_hash(parent_hash);
header.set_number(i);
header.set_timestamp(rolling_timestamp);
header.set_difficulty(*genesis_header.difficulty() * i.into());
chain.insert(::rlp::encode(&header).to_vec()).unwrap();
parent_hash = header.hash();
rolling_timestamp += 10;
}
assert!(chain.get_header(BlockId::Number(10)).is_none());
assert!(chain.get_header(BlockId::Number(9000)).is_some());
assert!(chain.cht_root(2).is_some());
assert!(chain.cht_root(3).is_none());
}
#[test]
fn reorganize() {
let spec = Spec::new_test();
let genesis_header = spec.genesis_header();
let chain = HeaderChain::new(&::rlp::encode(&genesis_header));
let mut parent_hash = genesis_header.hash();
let mut rolling_timestamp = genesis_header.timestamp();
for i in 1..6 {
let mut header = Header::new();
header.set_parent_hash(parent_hash);
header.set_number(i);
header.set_timestamp(rolling_timestamp);
header.set_difficulty(*genesis_header.difficulty() * i.into());
chain.insert(::rlp::encode(&header).to_vec()).unwrap();
parent_hash = header.hash();
rolling_timestamp += 10;
}
{
let mut rolling_timestamp = rolling_timestamp;
let mut parent_hash = parent_hash;
for i in 6..16 {
let mut header = Header::new();
header.set_parent_hash(parent_hash);
header.set_number(i);
header.set_timestamp(rolling_timestamp);
header.set_difficulty(*genesis_header.difficulty() * i.into());
chain.insert(::rlp::encode(&header).to_vec()).unwrap();
parent_hash = header.hash();
rolling_timestamp += 10;
}
}
assert_eq!(chain.best_block().number, 15);
{
let mut rolling_timestamp = rolling_timestamp;
let mut parent_hash = parent_hash;
// import a shorter chain which has better TD.
for i in 6..13 {
let mut header = Header::new();
header.set_parent_hash(parent_hash);
header.set_number(i);
header.set_timestamp(rolling_timestamp);
header.set_difficulty(*genesis_header.difficulty() * (i * i).into());
chain.insert(::rlp::encode(&header).to_vec()).unwrap();
parent_hash = header.hash();
rolling_timestamp += 11;
}
}
assert_eq!(chain.best_block().number, 12);
}
}

View File

@ -0,0 +1,264 @@
// Copyright 2015, 2016 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Light client implementation. Stores data from light sync
use ethcore::block_import_error::BlockImportError;
use ethcore::block_status::BlockStatus;
use ethcore::client::ClientReport;
use ethcore::ids::BlockId;
use ethcore::header::Header;
use ethcore::verification::queue::{self, HeaderQueue};
use ethcore::transaction::PendingTransaction;
use ethcore::blockchain_info::BlockChainInfo;
use ethcore::spec::Spec;
use ethcore::service::ClientIoMessage;
use ethcore::encoded;
use io::IoChannel;
use util::hash::{H256, H256FastMap};
use util::{Bytes, Mutex, RwLock};
use provider::Provider;
use request;
use self::header_chain::HeaderChain;
pub use self::service::Service;
pub mod cht;
mod header_chain;
mod service;
/// Configuration for the light client.
#[derive(Debug, Default, Clone)]
pub struct Config {
/// Verification queue config.
pub queue: queue::Config,
}
/// Trait for interacting with the header chain abstractly.
pub trait LightChainClient: Send + Sync {
/// Get chain info.
fn chain_info(&self) -> BlockChainInfo;
/// Queue header to be verified. Required that all headers queued have their
/// parent queued prior.
fn queue_header(&self, header: Header) -> Result<H256, BlockImportError>;
/// Query whether a block is known.
fn is_known(&self, hash: &H256) -> bool;
/// Clear the queue.
fn clear_queue(&self);
/// Get queue info.
fn queue_info(&self) -> queue::QueueInfo;
/// Get the `i`th CHT root.
fn cht_root(&self, i: usize) -> Option<H256>;
}
/// Light client implementation.
pub struct Client {
queue: HeaderQueue,
chain: HeaderChain,
tx_pool: Mutex<H256FastMap<PendingTransaction>>,
report: RwLock<ClientReport>,
import_lock: Mutex<()>,
}
impl Client {
/// Create a new `Client`.
pub fn new(config: Config, spec: &Spec, io_channel: IoChannel<ClientIoMessage>) -> Self {
Client {
queue: HeaderQueue::new(config.queue, spec.engine.clone(), io_channel, true),
chain: HeaderChain::new(&::rlp::encode(&spec.genesis_header())),
tx_pool: Mutex::new(Default::default()),
report: RwLock::new(ClientReport::default()),
import_lock: Mutex::new(()),
}
}
/// Import a header to the queue for additional verification.
pub fn import_header(&self, header: Header) -> Result<H256, BlockImportError> {
self.queue.import(header).map_err(Into::into)
}
/// Import a local transaction.
pub fn import_own_transaction(&self, tx: PendingTransaction) {
self.tx_pool.lock().insert(tx.transaction.hash(), tx);
}
/// Fetch a vector of all pending transactions.
pub fn ready_transactions(&self) -> Vec<PendingTransaction> {
let best_num = self.chain.best_block().number;
self.tx_pool.lock()
.values()
.filter(|t| t.min_block.as_ref().map_or(true, |x| x <= &best_num))
.cloned()
.collect()
}
/// Inquire about the status of a given header.
pub fn status(&self, hash: &H256) -> BlockStatus {
match self.queue.status(hash) {
queue::Status::Unknown => self.chain.status(hash),
other => other.into(),
}
}
/// Get the chain info.
pub fn chain_info(&self) -> BlockChainInfo {
let best_block = self.chain.best_block();
let first_block = self.chain.first_block();
let genesis_hash = self.chain.genesis_hash();
BlockChainInfo {
total_difficulty: best_block.total_difficulty,
pending_total_difficulty: best_block.total_difficulty,
genesis_hash: genesis_hash,
best_block_hash: best_block.hash,
best_block_number: best_block.number,
ancient_block_hash: if first_block.is_some() { Some(genesis_hash) } else { None },
ancient_block_number: if first_block.is_some() { Some(0) } else { None },
first_block_hash: first_block.as_ref().map(|first| first.hash),
first_block_number: first_block.as_ref().map(|first| first.number),
}
}
/// Get the header queue info.
pub fn queue_info(&self) -> queue::QueueInfo {
self.queue.queue_info()
}
/// Get a block header by Id.
pub fn get_header(&self, id: BlockId) -> Option<Bytes> {
self.chain.get_header(id)
}
/// Get the `i`th CHT root.
pub fn cht_root(&self, i: usize) -> Option<H256> {
self.chain.cht_root(i)
}
/// Import a set of pre-verified headers from the queue.
pub fn import_verified(&self) {
const MAX: usize = 256;
let _lock = self.import_lock.lock();
let mut bad = Vec::new();
let mut good = Vec::new();
for verified_header in self.queue.drain(MAX) {
let (num, hash) = (verified_header.number(), verified_header.hash());
match self.chain.insert(::rlp::encode(&verified_header).to_vec()) {
Ok(()) => {
good.push(hash);
self.report.write().blocks_imported += 1;
}
Err(e) => {
debug!(target: "client", "Error importing header {:?}: {}", (num, hash), e);
bad.push(hash);
}
}
}
self.queue.mark_as_bad(&bad);
self.queue.mark_as_good(&good);
}
/// Get a report about blocks imported.
pub fn report(&self) -> ClientReport {
::std::mem::replace(&mut *self.report.write(), ClientReport::default())
}
/// Get blockchain mem usage in bytes.
pub fn chain_mem_used(&self) -> usize {
use util::HeapSizeOf;
self.chain.heap_size_of_children()
}
}
impl LightChainClient for Client {
fn chain_info(&self) -> BlockChainInfo { Client::chain_info(self) }
fn queue_header(&self, header: Header) -> Result<H256, BlockImportError> {
self.import_header(header)
}
fn is_known(&self, hash: &H256) -> bool {
self.status(hash) == BlockStatus::InChain
}
fn clear_queue(&self) {
self.queue.clear()
}
fn queue_info(&self) -> queue::QueueInfo {
self.queue.queue_info()
}
fn cht_root(&self, i: usize) -> Option<H256> {
Client::cht_root(self, i)
}
}
// dummy implementation -- may draw from canonical cache further on.
impl Provider for Client {
fn chain_info(&self) -> BlockChainInfo {
Client::chain_info(self)
}
fn reorg_depth(&self, _a: &H256, _b: &H256) -> Option<u64> {
None
}
fn earliest_state(&self) -> Option<u64> {
None
}
fn block_header(&self, id: BlockId) -> Option<encoded::Header> {
self.chain.get_header(id).map(encoded::Header::new)
}
fn block_body(&self, _id: BlockId) -> Option<encoded::Body> {
None
}
fn block_receipts(&self, _hash: &H256) -> Option<Bytes> {
None
}
fn state_proof(&self, _req: request::StateProof) -> Vec<Bytes> {
Vec::new()
}
fn contract_code(&self, _req: request::ContractCode) -> Bytes {
Vec::new()
}
fn header_proof(&self, _req: request::HeaderProof) -> Option<(encoded::Header, Vec<Bytes>)> {
None
}
fn ready_transactions(&self) -> Vec<PendingTransaction> {
Vec::new()
}
}

View File

@ -0,0 +1,73 @@
// Copyright 2015, 2016 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Minimal IO service for light client.
//! Just handles block import messages and passes them to the client.
use std::sync::Arc;
use ethcore::service::ClientIoMessage;
use ethcore::spec::Spec;
use io::{IoContext, IoError, IoHandler, IoService};
use super::{Client, Config as ClientConfig};
/// Light client service.
pub struct Service {
client: Arc<Client>,
_io_service: IoService<ClientIoMessage>,
}
impl Service {
/// Start the service: initialize I/O workers and client itself.
pub fn start(config: ClientConfig, spec: &Spec) -> Result<Self, IoError> {
let io_service = try!(IoService::<ClientIoMessage>::start());
let client = Arc::new(Client::new(config, spec, io_service.channel()));
try!(io_service.register_handler(Arc::new(ImportBlocks(client.clone()))));
Ok(Service {
client: client,
_io_service: io_service,
})
}
/// Get a handle to the client.
pub fn client(&self) -> &Arc<Client> {
&self.client
}
}
struct ImportBlocks(Arc<Client>);
impl IoHandler<ClientIoMessage> for ImportBlocks {
fn message(&self, _io: &IoContext<ClientIoMessage>, message: &ClientIoMessage) {
if let ClientIoMessage::BlockVerified = *message {
self.0.import_verified();
}
}
}
#[cfg(test)]
mod tests {
use super::Service;
use ethcore::spec::Spec;
#[test]
fn it_works() {
let spec = Spec::new_test();
Service::start(Default::default(), &spec).unwrap();
}
}

View File

@ -25,8 +25,10 @@
//! low-latency applications, but perfectly suitable for simple everyday
//! use-cases like sending transactions from a personal account.
//!
//! It starts by performing a header-only sync, verifying random samples
//! of members of the chain to varying degrees.
//! The light client performs a header-only sync, doing verification and pruning
//! historical blocks. Upon pruning, batches of 2048 blocks have a number => hash
//! mapping sealed into "canonical hash tries" which can later be used to verify
//! historical block queries from peers.
#![deny(missing_docs)]
@ -60,6 +62,7 @@ extern crate ethcore_util as util;
extern crate ethcore_network as network;
extern crate ethcore_io as io;
extern crate rlp;
extern crate smallvec;
extern crate time;
#[cfg(feature = "ipc")]

View File

@ -20,7 +20,7 @@ use network::{NetworkContext, PeerId, NodeId};
use super::{Announcement, LightProtocol, ReqId};
use super::error::Error;
use request::Request;
use request::{self, Request};
/// An I/O context which allows sending and receiving packets as well as
/// disconnecting peers. This is used as a generalization of the portions
@ -77,12 +77,8 @@ impl<'a> IoContext for NetworkContext<'a> {
}
}
/// Context for a protocol event.
pub trait EventContext {
/// Get the peer relevant to the event e.g. message sender,
/// disconnected/connected peer.
fn peer(&self) -> PeerId;
/// Basic context for a the protocol.
pub trait BasicContext {
/// Returns the relevant's peer persistent Id (aka NodeId).
fn persistent_peer_id(&self, peer: PeerId) -> Option<NodeId>;
@ -93,6 +89,10 @@ pub trait EventContext {
// TODO: maybe just put this on a timer in LightProtocol?
fn make_announcement(&self, announcement: Announcement);
/// Find the maximum number of requests of a specific type which can be made from
/// supplied peer.
fn max_requests(&self, peer: PeerId, kind: request::Kind) -> usize;
/// Disconnect a peer.
fn disconnect_peer(&self, peer: PeerId);
@ -100,6 +100,50 @@ pub trait EventContext {
fn disable_peer(&self, peer: PeerId);
}
/// Context for a protocol event which has a peer ID attached.
pub trait EventContext: BasicContext {
/// Get the peer relevant to the event e.g. message sender,
/// disconnected/connected peer.
fn peer(&self) -> PeerId;
/// Treat the event context as a basic context.
fn as_basic(&self) -> &BasicContext;
}
/// Basic context.
pub struct TickCtx<'a> {
/// Io context to enable dispatch.
pub io: &'a IoContext,
/// Protocol implementation.
pub proto: &'a LightProtocol,
}
impl<'a> BasicContext for TickCtx<'a> {
fn persistent_peer_id(&self, id: PeerId) -> Option<NodeId> {
self.io.persistent_peer_id(id)
}
fn request_from(&self, peer: PeerId, request: Request) -> Result<ReqId, Error> {
self.proto.request_from(self.io, &peer, request)
}
fn make_announcement(&self, announcement: Announcement) {
self.proto.make_announcement(self.io, announcement);
}
fn max_requests(&self, peer: PeerId, kind: request::Kind) -> usize {
self.proto.max_requests(peer, kind)
}
fn disconnect_peer(&self, peer: PeerId) {
self.io.disconnect_peer(peer);
}
fn disable_peer(&self, peer: PeerId) {
self.io.disable_peer(peer);
}
}
/// Concrete implementation of `EventContext` over the light protocol struct and
/// an io context.
pub struct Ctx<'a> {
@ -111,11 +155,7 @@ pub struct Ctx<'a> {
pub peer: PeerId,
}
impl<'a> EventContext for Ctx<'a> {
fn peer(&self) -> PeerId {
self.peer
}
impl<'a> BasicContext for Ctx<'a> {
fn persistent_peer_id(&self, id: PeerId) -> Option<NodeId> {
self.io.persistent_peer_id(id)
}
@ -128,6 +168,10 @@ impl<'a> EventContext for Ctx<'a> {
self.proto.make_announcement(self.io, announcement);
}
fn max_requests(&self, peer: PeerId, kind: request::Kind) -> usize {
self.proto.max_requests(peer, kind)
}
fn disconnect_peer(&self, peer: PeerId) {
self.io.disconnect_peer(peer);
}
@ -136,3 +180,13 @@ impl<'a> EventContext for Ctx<'a> {
self.io.disable_peer(peer);
}
}
impl<'a> EventContext for Ctx<'a> {
fn peer(&self) -> PeerId {
self.peer
}
fn as_basic(&self) -> &BasicContext {
&*self
}
}

View File

@ -62,6 +62,8 @@ pub enum Error {
UnsupportedProtocolVersion(u8),
/// Bad protocol version.
BadProtocolVersion,
/// Peer is overburdened.
Overburdened,
}
impl Error {
@ -79,6 +81,7 @@ impl Error {
Error::NotServer => Punishment::Disable,
Error::UnsupportedProtocolVersion(_) => Punishment::Disable,
Error::BadProtocolVersion => Punishment::Disable,
Error::Overburdened => Punishment::None,
}
}
}
@ -109,6 +112,7 @@ impl fmt::Display for Error {
Error::NotServer => write!(f, "Peer not a server."),
Error::UnsupportedProtocolVersion(pv) => write!(f, "Unsupported protocol version: {}", pv),
Error::BadProtocolVersion => write!(f, "Bad protocol version in handshake"),
Error::Overburdened => write!(f, "Peer overburdened"),
}
}
}

View File

@ -37,8 +37,8 @@ use provider::Provider;
use request::{self, HashOrNumber, Request};
use self::buffer_flow::{Buffer, FlowParams};
use self::context::Ctx;
use self::error::{Error, Punishment};
use self::context::{Ctx, TickCtx};
use self::error::Punishment;
mod buffer_flow;
mod context;
@ -48,12 +48,16 @@ mod status;
#[cfg(test)]
mod tests;
pub use self::context::{EventContext, IoContext};
pub use self::error::Error;
pub use self::context::{BasicContext, EventContext, IoContext};
pub use self::status::{Status, Capabilities, Announcement};
const TIMEOUT: TimerToken = 0;
const TIMEOUT_INTERVAL_MS: u64 = 1000;
const TICK_TIMEOUT: TimerToken = 1;
const TICK_TIMEOUT_INTERVAL_MS: u64 = 5000;
// minimum interval between updates.
const UPDATE_INTERVAL_MS: i64 = 5000;
@ -131,8 +135,9 @@ struct Peer {
status: Status,
capabilities: Capabilities,
remote_flow: Option<(Buffer, FlowParams)>,
sent_head: H256, // last head we've given them.
sent_head: H256, // last chain head we've given them.
last_update: SteadyTime,
idle: bool, // make into a current percentage of max buffer being requested?
}
impl Peer {
@ -188,7 +193,11 @@ pub trait Handler: Send + Sync {
/// Called when a peer responds with header proofs. Each proof is a block header coupled
/// with a series of trie nodes is ascending order by distance from the root.
fn on_header_proofs(&self, _ctx: &EventContext, _req_id: ReqId, _proofs: &[(Bytes, Vec<Bytes>)]) { }
/// Called on abort.
/// Called to "tick" the handler periodically.
fn tick(&self, _ctx: &BasicContext) { }
/// Called on abort. This signals to handlers that they should clean up
/// and ignore peers.
// TODO: coreresponding `on_activate`?
fn on_abort(&self) { }
}
@ -253,18 +262,25 @@ impl LightProtocol {
}
/// Check the maximum amount of requests of a specific type
/// which a peer would be able to serve.
pub fn max_requests(&self, peer: PeerId, kind: request::Kind) -> Option<usize> {
/// which a peer would be able to serve. Returns zero if the
/// peer is unknown or has no buffer flow parameters.
fn max_requests(&self, peer: PeerId, kind: request::Kind) -> usize {
self.peers.read().get(&peer).and_then(|peer| {
let mut peer = peer.lock();
match peer.remote_flow.as_mut() {
Some(&mut (ref mut buf, ref flow)) => {
let idle = peer.idle;
match peer.remote_flow {
Some((ref mut buf, ref flow)) => {
flow.recharge(buf);
Some(flow.max_amount(&*buf, kind))
if !idle {
Some(0)
} else {
Some(flow.max_amount(&*buf, kind))
}
}
None => None,
}
})
}).unwrap_or(0)
}
/// Make a request to a peer.
@ -278,8 +294,10 @@ impl LightProtocol {
let peer = peers.get(peer_id).ok_or_else(|| Error::UnknownPeer)?;
let mut peer = peer.lock();
match peer.remote_flow.as_mut() {
Some(&mut (ref mut buf, ref flow)) => {
if !peer.idle { return Err(Error::Overburdened) }
match peer.remote_flow {
Some((ref mut buf, ref flow)) => {
flow.recharge(buf);
let max = flow.compute_cost(request.kind(), request.amount());
buf.deduct_cost(max)?;
@ -290,6 +308,8 @@ impl LightProtocol {
let req_id = self.req_id.fetch_add(1, Ordering::SeqCst);
let packet_data = encode_request(&request, req_id);
trace!(target: "les", "Dispatching request {} to peer {}", req_id, peer_id);
let packet_id = match request.kind() {
request::Kind::Headers => packet::GET_BLOCK_HEADERS,
request::Kind::Bodies => packet::GET_BLOCK_BODIES,
@ -301,6 +321,7 @@ impl LightProtocol {
io.send(*peer_id, packet_id, packet_data);
peer.idle = false;
self.pending_requests.write().insert(req_id, Requested {
request: request,
timestamp: SteadyTime::now(),
@ -404,6 +425,8 @@ impl LightProtocol {
match peers.get(peer) {
Some(peer_info) => {
let mut peer_info = peer_info.lock();
peer_info.idle = true;
match peer_info.remote_flow.as_mut() {
Some(&mut (ref mut buf, ref mut flow)) => {
let actual_buffer = ::std::cmp::min(cur_buffer, *flow.limit());
@ -505,6 +528,15 @@ impl LightProtocol {
}
}
}
fn tick_handlers(&self, io: &IoContext) {
for handler in &self.handlers {
handler.tick(&TickCtx {
io: io,
proto: self,
})
}
}
}
impl LightProtocol {
@ -603,6 +635,7 @@ impl LightProtocol {
remote_flow: remote_flow,
sent_head: pending.sent_head,
last_update: pending.last_update,
idle: true,
}));
for handler in &self.handlers {
@ -1123,7 +1156,10 @@ fn punish(peer: PeerId, io: &IoContext, e: Error) {
impl NetworkProtocolHandler for LightProtocol {
fn initialize(&self, io: &NetworkContext) {
io.register_timer(TIMEOUT, TIMEOUT_INTERVAL_MS).expect("Error registering sync timer.");
io.register_timer(TIMEOUT, TIMEOUT_INTERVAL_MS)
.expect("Error registering sync timer.");
io.register_timer(TICK_TIMEOUT, TICK_TIMEOUT_INTERVAL_MS)
.expect("Error registering sync timer.");
}
fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
@ -1141,6 +1177,7 @@ impl NetworkProtocolHandler for LightProtocol {
fn timeout(&self, io: &NetworkContext, timer: TimerToken) {
match timer {
TIMEOUT => self.timeout_check(io),
TICK_TIMEOUT => self.tick_handlers(io),
_ => warn!(target: "les", "received timeout on unknown token {}", timer),
}
}

View File

@ -12,6 +12,7 @@ base_path = "$HOME/.parity"
db_path = "$HOME/.parity/chains"
keys_path = "$HOME/.parity/keys"
identity = ""
light = false
[account]
unlock = ["0xdeadbeefcafe0000000000000000000000000000"]

View File

@ -32,12 +32,10 @@ use ethcore::verification::queue::VerifierSettings;
use ethsync::SyncConfig;
use informant::Informant;
use updater::{UpdatePolicy, Updater};
use parity_reactor::{EventLoop, EventLoopHandle};
use parity_reactor::EventLoop;
use hash_fetch::fetch::{Fetch, Client as FetchClient};
use rpc::{HttpServer, IpcServer, HttpConfiguration, IpcConfiguration};
use signer::SignerServer;
use dapps::WebappServer;
use rpc::{HttpConfiguration, IpcConfiguration};
use params::{
SpecType, Pruning, AccountsConfig, GasPricerConfig, MinerExtras, Switch,
tracing_switch_to_bool, fatdb_switch_to_bool, mode_switch_to_bool
@ -444,16 +442,10 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> R
}
// Handle exit
let restart = wait_for_exit(
panic_handler,
http_server,
ipc_server,
dapps_server,
signer_server,
event_loop.into(),
updater,
can_restart,
);
let restart = wait_for_exit(panic_handler, Some(updater), can_restart);
// drop this stuff as soon as exit detected.
drop((http_server, ipc_server, dapps_server, signer_server, event_loop));
info!("Finishing work, please wait...");
@ -523,12 +515,7 @@ fn build_create_account_hint(spec: &SpecType, keys: &str) -> String {
fn wait_for_exit(
panic_handler: Arc<PanicHandler>,
_http_server: Option<HttpServer>,
_ipc_server: Option<IpcServer>,
_dapps_server: Option<WebappServer>,
_signer_server: Option<SignerServer>,
_event_loop: EventLoopHandle,
updater: Arc<Updater>,
updater: Option<Arc<Updater>>,
can_restart: bool
) -> bool {
let exit = Arc::new((Mutex::new(false), Condvar::new()));
@ -541,12 +528,14 @@ fn wait_for_exit(
let e = exit.clone();
panic_handler.on_panic(move |_reason| { e.1.notify_all(); });
// Handle updater wanting to restart us
if can_restart {
let e = exit.clone();
updater.set_exit_handler(move || { *e.0.lock() = true; e.1.notify_all(); });
} else {
updater.set_exit_handler(|| info!("Update installed; ready for restart."));
if let Some(updater) = updater {
// Handle updater wanting to restart us
if can_restart {
let e = exit.clone();
updater.set_exit_handler(move || { *e.0.lock() = true; e.1.notify_all(); });
} else {
updater.set_exit_handler(|| info!("Update installed; ready for restart."));
}
}
// Wait for signal

View File

@ -33,6 +33,8 @@ use ipc::{BinaryConvertable, BinaryConvertError, IpcConfig};
use std::str::FromStr;
use parking_lot::RwLock;
use chain::{ETH_PACKET_COUNT, SNAPSHOT_SYNC_PACKET_COUNT};
use light::client::LightChainClient;
use light::Provider;
use light::net::{LightProtocol, Params as LightParams, Capabilities, Handler as LightHandler, EventContext};
/// Parity sync protocol
@ -567,3 +569,101 @@ pub struct ServiceConfiguration {
/// IPC path.
pub io_path: String,
}
/// Configuration for the light sync.
pub struct LightSyncParams<L> {
/// Network configuration.
pub network_config: BasicNetworkConfiguration,
/// Light client to sync to.
pub client: Arc<L>,
/// Network ID.
pub network_id: u64,
/// Subprotocol name.
pub subprotocol_name: [u8; 3],
}
/// Service for light synchronization.
pub struct LightSync {
proto: Arc<LightProtocol>,
network: NetworkService,
subprotocol_name: [u8; 3],
}
impl LightSync {
/// Create a new light sync service.
pub fn new<L>(params: LightSyncParams<L>) -> Result<Self, NetworkError>
where L: LightChainClient + Provider + 'static
{
use light_sync::LightSync as SyncHandler;
// initialize light protocol handler and attach sync module.
let light_proto = {
let light_params = LightParams {
network_id: params.network_id,
flow_params: Default::default(), // or `None`?
capabilities: Capabilities {
serve_headers: false,
serve_chain_since: None,
serve_state_since: None,
tx_relay: false,
},
};
let mut light_proto = LightProtocol::new(params.client.clone(), light_params);
let sync_handler = try!(SyncHandler::new(params.client.clone()));
light_proto.add_handler(Box::new(sync_handler));
Arc::new(light_proto)
};
let service = try!(NetworkService::new(params.network_config));
Ok(LightSync {
proto: light_proto,
network: service,
subprotocol_name: params.subprotocol_name,
})
}
}
impl ManageNetwork for LightSync {
fn accept_unreserved_peers(&self) {
self.network.set_non_reserved_mode(NonReservedPeerMode::Accept);
}
fn deny_unreserved_peers(&self) {
self.network.set_non_reserved_mode(NonReservedPeerMode::Deny);
}
fn remove_reserved_peer(&self, peer: String) -> Result<(), String> {
self.network.remove_reserved_peer(&peer).map_err(|e| format!("{:?}", e))
}
fn add_reserved_peer(&self, peer: String) -> Result<(), String> {
self.network.add_reserved_peer(&peer).map_err(|e| format!("{:?}", e))
}
fn start_network(&self) {
match self.network.start() {
Err(NetworkError::StdIo(ref e)) if e.kind() == io::ErrorKind::AddrInUse => warn!("Network port {:?} is already in use, make sure that another instance of an Ethereum client is not running or change the port using the --port option.", self.network.config().listen_address.expect("Listen address is not set.")),
Err(err) => warn!("Error starting network: {}", err),
_ => {},
}
let light_proto = self.proto.clone();
self.network.register_protocol(light_proto, self.subprotocol_name, ::light::net::PACKET_COUNT, ::light::net::PROTOCOL_VERSIONS)
.unwrap_or_else(|e| warn!("Error registering light client protocol: {:?}", e));
}
fn stop_network(&self) {
self.proto.abort();
if let Err(e) = self.network.stop() {
warn!("Error stopping network: {}", e);
}
}
fn network_config(&self) -> NetworkConfiguration {
NetworkConfiguration::from(self.network.config().clone())
}
}

View File

@ -58,6 +58,8 @@ mod sync_io;
mod snapshot;
mod transactions_stats;
pub mod light_sync;
#[cfg(test)]
mod tests;
@ -70,8 +72,11 @@ mod api {
#[cfg(not(feature = "ipc"))]
mod api;
pub use api::{EthSync, Params, SyncProvider, ManageNetwork, SyncConfig,
ServiceConfiguration, NetworkConfiguration, PeerInfo, AllowIP, TransactionStats};
pub use api::{
EthSync, Params, SyncProvider, ManageNetwork, SyncConfig,
ServiceConfiguration, NetworkConfiguration, PeerInfo, AllowIP, TransactionStats,
LightSync, LightSyncParams,
};
pub use chain::{SyncStatus, SyncState};
pub use network::{is_valid_node_url, NonReservedPeerMode, NetworkError};

459
sync/src/light_sync/mod.rs Normal file
View File

@ -0,0 +1,459 @@
// Copyright 2015, 2016 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Light client synchronization.
//!
//! This will synchronize the header chain using LES messages.
//! Dataflow is largely one-directional as headers are pushed into
//! the light client queue for import. Where possible, they are batched
//! in groups.
//!
//! This is written assuming that the client and sync service are running
//! in the same binary; unlike a full node which might communicate via IPC.
use std::collections::HashMap;
use std::mem;
use std::sync::Arc;
use light::client::LightChainClient;
use light::net::{
Announcement, Handler, BasicContext, EventContext,
Capabilities, ReqId, Status,
};
use light::request;
use network::PeerId;
use util::{Bytes, U256, H256, Mutex, RwLock};
use rand::{Rng, OsRng};
use self::sync_round::{AbortReason, SyncRound, ResponseContext};
mod response;
mod sync_round;
/// Peer chain info.
#[derive(Clone)]
struct ChainInfo {
head_td: U256,
head_hash: H256,
head_num: u64,
}
struct Peer {
status: ChainInfo,
}
impl Peer {
// Create a new peer.
fn new(chain_info: ChainInfo) -> Self {
Peer {
status: chain_info,
}
}
}
// search for a common ancestor with the best chain.
enum AncestorSearch {
Queued(u64), // queued to search for blocks starting from here.
Awaiting(ReqId, u64, request::Headers), // awaiting response for this request.
Prehistoric, // prehistoric block found. TODO: start to roll back CHTs.
FoundCommon(u64, H256), // common block found.
Genesis, // common ancestor is the genesis.
}
impl AncestorSearch {
fn begin(best_num: u64) -> Self {
match best_num {
0 => AncestorSearch::Genesis,
_ => AncestorSearch::Queued(best_num),
}
}
fn process_response<L>(self, ctx: &ResponseContext, client: &L) -> AncestorSearch
where L: LightChainClient
{
let first_num = client.chain_info().first_block_number.unwrap_or(0);
match self {
AncestorSearch::Awaiting(id, start, req) => {
if &id == ctx.req_id() {
match response::decode_and_verify(ctx.data(), &req) {
Ok(headers) => {
for header in &headers {
if client.is_known(&header.hash()) {
debug!(target: "sync", "Found common ancestor with best chain");
return AncestorSearch::FoundCommon(header.number(), header.hash());
}
if header.number() <= first_num {
debug!(target: "sync", "Prehistoric common ancestor with best chain.");
return AncestorSearch::Prehistoric;
}
}
AncestorSearch::Queued(start - headers.len() as u64)
}
Err(e) => {
trace!(target: "sync", "Bad headers response from {}: {}", ctx.responder(), e);
ctx.punish_responder();
AncestorSearch::Queued(start)
}
}
} else {
AncestorSearch::Awaiting(id, start, req)
}
}
other => other,
}
}
fn dispatch_request<F>(self, mut dispatcher: F) -> AncestorSearch
where F: FnMut(request::Headers) -> Option<ReqId>
{
const BATCH_SIZE: usize = 64;
match self {
AncestorSearch::Queued(start) => {
let req = request::Headers {
start: start.into(),
max: ::std::cmp::min(start as usize, BATCH_SIZE),
skip: 0,
reverse: true,
};
match dispatcher(req.clone()) {
Some(req_id) => AncestorSearch::Awaiting(req_id, start, req),
None => AncestorSearch::Queued(start),
}
}
other => other,
}
}
}
// synchronization state machine.
enum SyncState {
// Idle (waiting for peers)
Idle,
// searching for common ancestor with best chain.
// queue should be cleared at this phase.
AncestorSearch(AncestorSearch),
// Doing sync rounds.
Rounds(SyncRound),
}
struct ResponseCtx<'a> {
peer: PeerId,
req_id: ReqId,
ctx: &'a BasicContext,
data: &'a [Bytes],
}
impl<'a> ResponseContext for ResponseCtx<'a> {
fn responder(&self) -> PeerId { self.peer }
fn req_id(&self) -> &ReqId { &self.req_id }
fn data(&self) -> &[Bytes] { self.data }
fn punish_responder(&self) { self.ctx.disable_peer(self.peer) }
}
/// Light client synchronization manager. See module docs for more details.
pub struct LightSync<L: LightChainClient> {
best_seen: Mutex<Option<(H256, U256)>>, // best seen block on the network.
peers: RwLock<HashMap<PeerId, Mutex<Peer>>>, // peers which are relevant to synchronization.
client: Arc<L>,
rng: Mutex<OsRng>,
state: Mutex<SyncState>,
}
impl<L: LightChainClient> Handler for LightSync<L> {
fn on_connect(&self, ctx: &EventContext, status: &Status, capabilities: &Capabilities) {
let our_best = self.client.chain_info().best_block_number;
if !capabilities.serve_headers || status.head_num <= our_best {
trace!(target: "sync", "Disconnecting irrelevant peer: {}", ctx.peer());
ctx.disconnect_peer(ctx.peer());
return;
}
let chain_info = ChainInfo {
head_td: status.head_td,
head_hash: status.head_hash,
head_num: status.head_num,
};
{
let mut best = self.best_seen.lock();
if best.as_ref().map_or(true, |b| status.head_td > b.1) {
*best = Some((status.head_hash, status.head_td));
}
}
self.peers.write().insert(ctx.peer(), Mutex::new(Peer::new(chain_info)));
self.maintain_sync(ctx.as_basic());
}
fn on_disconnect(&self, ctx: &EventContext, unfulfilled: &[ReqId]) {
let peer_id = ctx.peer();
let peer = match self.peers.write().remove(&peer_id).map(|p| p.into_inner()) {
Some(peer) => peer,
None => return,
};
trace!(target: "sync", "peer {} disconnecting", peer_id);
let new_best = {
let mut best = self.best_seen.lock();
let peer_best = (peer.status.head_hash, peer.status.head_td);
if best.as_ref().map_or(false, |b| b == &peer_best) {
// search for next-best block.
let next_best: Option<(H256, U256)> = self.peers.read().values()
.map(|p| p.lock())
.map(|p| (p.status.head_hash, p.status.head_td))
.fold(None, |acc, x| match acc {
Some(acc) => if x.1 > acc.1 { Some(x) } else { Some(acc) },
None => Some(x),
});
*best = next_best;
}
best.clone()
};
if new_best.is_none() {
debug!(target: "sync", "No peers remain. Reverting to idle");
*self.state.lock() = SyncState::Idle;
} else {
let mut state = self.state.lock();
*state = match mem::replace(&mut *state, SyncState::Idle) {
SyncState::Idle => SyncState::Idle,
SyncState::AncestorSearch(search) => SyncState::AncestorSearch(search),
SyncState::Rounds(round) => SyncState::Rounds(round.requests_abandoned(unfulfilled)),
};
}
self.maintain_sync(ctx.as_basic());
}
fn on_announcement(&self, ctx: &EventContext, announcement: &Announcement) {
let last_td = {
let peers = self.peers.read();
match peers.get(&ctx.peer()) {
None => return,
Some(peer) => {
let mut peer = peer.lock();
let last_td = peer.status.head_td;
peer.status = ChainInfo {
head_td: announcement.head_td,
head_hash: announcement.head_hash,
head_num: announcement.head_num,
};
last_td
}
}
};
trace!(target: "sync", "Announcement from peer {}: new chain head {:?}, reorg depth {}",
ctx.peer(), (announcement.head_hash, announcement.head_num), announcement.reorg_depth);
if last_td > announcement.head_td {
trace!(target: "sync", "Peer {} moved backwards.", ctx.peer());
self.peers.write().remove(&ctx.peer());
ctx.disconnect_peer(ctx.peer());
}
{
let mut best = self.best_seen.lock();
if best.as_ref().map_or(true, |b| announcement.head_td > b.1) {
*best = Some((announcement.head_hash, announcement.head_td));
}
}
self.maintain_sync(ctx.as_basic());
}
fn on_block_headers(&self, ctx: &EventContext, req_id: ReqId, headers: &[Bytes]) {
if !self.peers.read().contains_key(&ctx.peer()) {
return
}
{
let mut state = self.state.lock();
let ctx = ResponseCtx {
peer: ctx.peer(),
req_id: req_id,
ctx: ctx.as_basic(),
data: headers,
};
*state = match mem::replace(&mut *state, SyncState::Idle) {
SyncState::Idle => SyncState::Idle,
SyncState::AncestorSearch(search) =>
SyncState::AncestorSearch(search.process_response(&ctx, &*self.client)),
SyncState::Rounds(round) => SyncState::Rounds(round.process_response(&ctx)),
};
}
self.maintain_sync(ctx.as_basic());
}
fn tick(&self, ctx: &BasicContext) {
self.maintain_sync(ctx);
}
}
// private helpers
impl<L: LightChainClient> LightSync<L> {
// Begins a search for the common ancestor and our best block.
// does not lock state, instead has a mutable reference to it passed.
fn begin_search(&self, state: &mut SyncState) {
if let None = *self.best_seen.lock() {
// no peers.
*state = SyncState::Idle;
return;
}
trace!(target: "sync", "Beginning search for common ancestor");
self.client.clear_queue();
let chain_info = self.client.chain_info();
*state = SyncState::AncestorSearch(AncestorSearch::begin(chain_info.best_block_number));
}
fn maintain_sync(&self, ctx: &BasicContext) {
const DRAIN_AMOUNT: usize = 128;
debug!(target: "sync", "Maintaining sync.");
let mut state = self.state.lock();
// drain any pending blocks into the queue.
{
let mut sink = Vec::with_capacity(DRAIN_AMOUNT);
'a:
loop {
let queue_info = self.client.queue_info();
if queue_info.is_full() { break }
*state = match mem::replace(&mut *state, SyncState::Idle) {
SyncState::Rounds(round)
=> SyncState::Rounds(round.drain(&mut sink, Some(DRAIN_AMOUNT))),
other => other,
};
if sink.is_empty() { break }
for header in sink.drain(..) {
if let Err(e) = self.client.queue_header(header) {
debug!(target: "sync", "Found bad header ({:?}). Reset to search state.", e);
self.begin_search(&mut state);
break 'a;
}
}
}
}
// handle state transitions.
{
match mem::replace(&mut *state, SyncState::Idle) {
SyncState::Rounds(SyncRound::Abort(reason)) => {
match reason {
AbortReason::BadScaffold(bad_peers) => {
debug!(target: "sync", "Disabling peers responsible for bad scaffold");
for peer in bad_peers {
ctx.disable_peer(peer);
}
}
AbortReason::NoResponses => {}
}
debug!(target: "sync", "Beginning search after aborted sync round");
self.begin_search(&mut state);
}
SyncState::AncestorSearch(AncestorSearch::FoundCommon(num, hash)) => {
// TODO: compare to best block and switch to another downloading
// method when close.
*state = SyncState::Rounds(SyncRound::begin(num, hash));
}
SyncState::AncestorSearch(AncestorSearch::Genesis) => {
// Same here.
let g_hash = self.client.chain_info().genesis_hash;
*state = SyncState::Rounds(SyncRound::begin(0, g_hash));
}
SyncState::Idle => self.begin_search(&mut state),
other => *state = other, // restore displaced state.
}
}
// allow dispatching of requests.
// TODO: maybe wait until the amount of cumulative requests remaining is high enough
// to avoid pumping the failure rate.
{
let peers = self.peers.read();
let mut peer_ids: Vec<_> = peers.keys().cloned().collect();
let mut rng = self.rng.lock();
// naive request dispatcher: just give to any peer which says it will
// give us responses.
let dispatcher = move |req: request::Headers| {
rng.shuffle(&mut peer_ids);
for peer in &peer_ids {
if ctx.max_requests(*peer, request::Kind::Headers) >= req.max {
match ctx.request_from(*peer, request::Request::Headers(req.clone())) {
Ok(id) => {
return Some(id)
}
Err(e) =>
trace!(target: "sync", "Error requesting headers from viable peer: {}", e),
}
}
}
None
};
*state = match mem::replace(&mut *state, SyncState::Idle) {
SyncState::Rounds(round) =>
SyncState::Rounds(round.dispatch_requests(dispatcher)),
SyncState::AncestorSearch(search) =>
SyncState::AncestorSearch(search.dispatch_request(dispatcher)),
other => other,
};
}
}
}
// public API
impl<L: LightChainClient> LightSync<L> {
/// Create a new instance of `LightSync`.
///
/// This won't do anything until registered as a handler
/// so it can act on events.
pub fn new(client: Arc<L>) -> Result<Self, ::std::io::Error> {
Ok(LightSync {
best_seen: Mutex::new(None),
peers: RwLock::new(HashMap::new()),
client: client,
rng: Mutex::new(try!(OsRng::new())),
state: Mutex::new(SyncState::Idle),
})
}
}

View File

@ -0,0 +1,254 @@
// Copyright 2015, 2016 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Helpers for decoding and verifying responses for headers.
use std::fmt;
use ethcore::header::Header;
use light::request::{HashOrNumber, Headers as HeadersRequest};
use rlp::{DecoderError, UntrustedRlp, View};
use util::{Bytes, H256};
/// Errors found when decoding headers and verifying with basic constraints.
#[derive(Debug, PartialEq)]
pub enum BasicError {
/// Wrong skip value: expected, found (if any).
WrongSkip(u64, Option<u64>),
/// Wrong start number.
WrongStartNumber(u64, u64),
/// Wrong start hash.
WrongStartHash(H256, H256),
/// Too many headers.
TooManyHeaders(usize, usize),
/// Decoder error.
Decoder(DecoderError),
}
impl From<DecoderError> for BasicError {
fn from(err: DecoderError) -> Self {
BasicError::Decoder(err)
}
}
impl fmt::Display for BasicError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
try!(write!(f, "Header response verification error: "));
match *self {
BasicError::WrongSkip(ref exp, ref got)
=> write!(f, "wrong skip (expected {}, got {:?})", exp, got),
BasicError::WrongStartNumber(ref exp, ref got)
=> write!(f, "wrong start number (expected {}, got {})", exp, got),
BasicError::WrongStartHash(ref exp, ref got)
=> write!(f, "wrong start hash (expected {}, got {})", exp, got),
BasicError::TooManyHeaders(ref max, ref got)
=> write!(f, "too many headers (max {}, got {})", max, got),
BasicError::Decoder(ref err)
=> write!(f, "{}", err),
}
}
}
/// Request verification constraint.
pub trait Constraint {
type Error;
/// Verify headers against this.
fn verify(&self, headers: &[Header], reverse: bool) -> Result<(), Self::Error>;
}
/// Decode a response and do basic verification against a request.
pub fn decode_and_verify(headers: &[Bytes], request: &HeadersRequest) -> Result<Vec<Header>, BasicError> {
let headers: Vec<_> = try!(headers.iter().map(|x| UntrustedRlp::new(&x).as_val()).collect());
let reverse = request.reverse;
try!(Max(request.max).verify(&headers, reverse));
match request.start {
HashOrNumber::Number(ref num) => try!(StartsAtNumber(*num).verify(&headers, reverse)),
HashOrNumber::Hash(ref hash) => try!(StartsAtHash(*hash).verify(&headers, reverse)),
}
try!(SkipsBetween(request.skip).verify(&headers, reverse));
Ok(headers)
}
struct StartsAtNumber(u64);
struct StartsAtHash(H256);
struct SkipsBetween(u64);
struct Max(usize);
impl Constraint for StartsAtNumber {
type Error = BasicError;
fn verify(&self, headers: &[Header], _reverse: bool) -> Result<(), BasicError> {
headers.first().map_or(Ok(()), |h| {
if h.number() == self.0 {
Ok(())
} else {
Err(BasicError::WrongStartNumber(self.0, h.number()))
}
})
}
}
impl Constraint for StartsAtHash {
type Error = BasicError;
fn verify(&self, headers: &[Header], _reverse: bool) -> Result<(), BasicError> {
headers.first().map_or(Ok(()), |h| {
if h.hash() == self.0 {
Ok(())
} else {
Err(BasicError::WrongStartHash(self.0, h.hash()))
}
})
}
}
impl Constraint for SkipsBetween {
type Error = BasicError;
fn verify(&self, headers: &[Header], reverse: bool) -> Result<(), BasicError> {
for pair in headers.windows(2) {
let (low, high) = if reverse { (&pair[1], &pair[0]) } else { (&pair[0], &pair[1]) };
if low.number() >= high.number() { return Err(BasicError::WrongSkip(self.0, None)) }
let skip = (high.number() - low.number()) - 1;
if skip != self.0 { return Err(BasicError::WrongSkip(self.0, Some(skip))) }
}
Ok(())
}
}
impl Constraint for Max {
type Error = BasicError;
fn verify(&self, headers: &[Header], _reverse: bool) -> Result<(), BasicError> {
match headers.len() > self.0 {
true => Err(BasicError::TooManyHeaders(self.0, headers.len())),
false => Ok(())
}
}
}
#[cfg(test)]
mod tests {
use ethcore::header::Header;
use light::request::Headers as HeadersRequest;
use super::*;
#[test]
fn sequential_forward() {
let request = HeadersRequest {
start: 10.into(),
max: 30,
skip: 0,
reverse: false,
};
let mut parent_hash = None;
let headers: Vec<_> = (0..25).map(|x| x + 10).map(|x| {
let mut header = Header::default();
header.set_number(x);
if let Some(parent_hash) = parent_hash {
header.set_parent_hash(parent_hash);
}
parent_hash = Some(header.hash());
::rlp::encode(&header).to_vec()
}).collect();
assert!(decode_and_verify(&headers, &request).is_ok());
}
#[test]
fn sequential_backward() {
let request = HeadersRequest {
start: 34.into(),
max: 30,
skip: 0,
reverse: true,
};
let mut parent_hash = None;
let headers: Vec<_> = (0..25).map(|x| x + 10).rev().map(|x| {
let mut header = Header::default();
header.set_number(x);
if let Some(parent_hash) = parent_hash {
header.set_parent_hash(parent_hash);
}
parent_hash = Some(header.hash());
::rlp::encode(&header).to_vec()
}).collect();
assert!(decode_and_verify(&headers, &request).is_ok());
}
#[test]
fn too_many() {
let request = HeadersRequest {
start: 10.into(),
max: 20,
skip: 0,
reverse: false,
};
let mut parent_hash = None;
let headers: Vec<_> = (0..25).map(|x| x + 10).map(|x| {
let mut header = Header::default();
header.set_number(x);
if let Some(parent_hash) = parent_hash {
header.set_parent_hash(parent_hash);
}
parent_hash = Some(header.hash());
::rlp::encode(&header).to_vec()
}).collect();
assert_eq!(decode_and_verify(&headers, &request), Err(BasicError::TooManyHeaders(20, 25)));
}
#[test]
fn wrong_skip() {
let request = HeadersRequest {
start: 10.into(),
max: 30,
skip: 5,
reverse: false,
};
let headers: Vec<_> = (0..25).map(|x| x * 3).map(|x| x + 10).map(|x| {
let mut header = Header::default();
header.set_number(x);
::rlp::encode(&header).to_vec()
}).collect();
assert_eq!(decode_and_verify(&headers, &request), Err(BasicError::WrongSkip(5, Some(2))));
}
}

View File

@ -0,0 +1,456 @@
// Copyright 2015, 2016 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Header download state machine.
use std::cmp::Ordering;
use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
use ethcore::header::Header;
use light::net::ReqId;
use light::request::Headers as HeadersRequest;
use network::PeerId;
use util::{Bytes, H256};
use super::response;
// amount of blocks between each scaffold entry.
// TODO: move these into parameters for `RoundStart::new`?
const ROUND_SKIP: u64 = 255;
// amount of scaffold frames: these are the blank spaces in "X___X___X"
const ROUND_FRAMES: usize = 255;
// number of attempts to make to get a full scaffold for a sync round.
const SCAFFOLD_ATTEMPTS: usize = 3;
/// Context for a headers response.
pub trait ResponseContext {
/// Get the peer who sent this response.
fn responder(&self) -> PeerId;
/// Get the request ID this response corresponds to.
fn req_id(&self) -> &ReqId;
/// Get the (unverified) response data.
fn data(&self) -> &[Bytes];
/// Punish the responder.
fn punish_responder(&self);
}
/// Reasons for sync round abort.
#[derive(Debug, Clone)]
pub enum AbortReason {
/// Bad sparse header chain along with a list of peers who contributed to it.
BadScaffold(Vec<PeerId>),
/// No incoming data.
NoResponses,
}
// A request for headers with a known starting header hash.
// and a known parent hash for the last block.
#[derive(PartialEq, Eq)]
struct SubchainRequest {
subchain_parent: (u64, H256),
headers_request: HeadersRequest,
subchain_end: (u64, H256),
downloaded: VecDeque<Header>,
}
// ordered by subchain parent number so pending requests towards the
// front of the round are dispatched first.
impl PartialOrd for SubchainRequest {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
self.subchain_parent.0
.partial_cmp(&other.subchain_parent.0)
.map(Ordering::reverse)
}
}
impl Ord for SubchainRequest {
fn cmp(&self, other: &Self) -> Ordering {
self.subchain_parent.0.cmp(&other.subchain_parent.0).reverse()
}
}
/// Manages downloading of interior blocks of a sparse header chain.
pub struct Fetcher {
sparse: VecDeque<Header>, // sparse header chain.
requests: BinaryHeap<SubchainRequest>,
complete_requests: HashMap<H256, SubchainRequest>,
pending: HashMap<ReqId, SubchainRequest>,
scaffold_contributors: Vec<PeerId>,
ready: VecDeque<Header>,
end: (u64, H256),
}
impl Fetcher {
// Produce a new fetcher given a sparse headerchain, in ascending order along
// with a list of peers who helped produce the chain.
// The headers must be valid RLP at this point and must have a consistent
// non-zero gap between them. Will abort the round if found wrong.
fn new(sparse_headers: Vec<Header>, contributors: Vec<PeerId>) -> SyncRound {
let mut requests = BinaryHeap::with_capacity(sparse_headers.len() - 1);
for pair in sparse_headers.windows(2) {
let low_rung = &pair[0];
let high_rung = &pair[1];
let diff = high_rung.number() - low_rung.number();
// should never happen as long as we verify the gaps
// gotten from SyncRound::Start
if diff < 2 { continue }
let needed_headers = HeadersRequest {
start: high_rung.parent_hash().clone().into(),
max: diff as usize - 1,
skip: 0,
reverse: true,
};
requests.push(SubchainRequest {
headers_request: needed_headers,
subchain_end: (high_rung.number() - 1, *high_rung.parent_hash()),
downloaded: VecDeque::new(),
subchain_parent: (low_rung.number(), low_rung.hash()),
});
}
let end = match sparse_headers.last().map(|h| (h.number(), h.hash())) {
Some(end) => end,
None => return SyncRound::abort(AbortReason::BadScaffold(contributors)),
};
SyncRound::Fetch(Fetcher {
sparse: sparse_headers.into(),
requests: requests,
complete_requests: HashMap::new(),
pending: HashMap::new(),
scaffold_contributors: contributors,
ready: VecDeque::new(),
end: end,
})
}
// collect complete requests and their subchain from the sparse header chain
// into the ready set in order.
fn collect_ready(&mut self) {
loop {
let start_hash = match self.sparse.front() {
Some(first) => first.hash(),
None => break,
};
match self.complete_requests.remove(&start_hash) {
None => break,
Some(complete_req) => {
self.ready.push_back(self.sparse.pop_front().expect("first known to exist; qed"));
self.ready.extend(complete_req.downloaded);
}
}
}
// frames are between two sparse headers and keyed by subchain parent, so the last
// remaining will be the last header.
if self.sparse.len() == 1 {
self.ready.push_back(self.sparse.pop_back().expect("sparse known to have one entry; qed"))
}
}
fn process_response<R: ResponseContext>(mut self, ctx: &R) -> SyncRound {
let mut request = match self.pending.remove(ctx.req_id()) {
Some(request) => request,
None => return SyncRound::Fetch(self),
};
let headers = ctx.data();
if headers.len() == 0 {
trace!(target: "sync", "Punishing peer {} for empty response", ctx.responder());
ctx.punish_responder();
return SyncRound::Fetch(self);
}
match response::decode_and_verify(headers, &request.headers_request) {
Err(e) => {
trace!(target: "sync", "Punishing peer {} for invalid response ({})", ctx.responder(), e);
ctx.punish_responder();
// TODO: track number of attempts per request,
// abort if failure rate too high.
self.requests.push(request);
SyncRound::Fetch(self)
}
Ok(headers) => {
let mut parent_hash = None;
for header in headers {
if parent_hash.as_ref().map_or(false, |h| h != &header.hash()) {
trace!(target: "sync", "Punishing peer {} for parent mismatch", ctx.responder());
ctx.punish_responder();
self.requests.push(request);
return SyncRound::Fetch(self);
}
// incrementally update the frame request as we go so we can
// return at any time in the loop.
parent_hash = Some(header.parent_hash().clone());
request.headers_request.start = header.parent_hash().clone().into();
request.headers_request.max -= 1;
request.downloaded.push_front(header);
}
let subchain_parent = request.subchain_parent.1;
if request.headers_request.max == 0 {
if parent_hash.map_or(true, |hash| hash != subchain_parent) {
let abort = AbortReason::BadScaffold(self.scaffold_contributors);
return SyncRound::Abort(abort);
}
self.complete_requests.insert(subchain_parent, request);
self.collect_ready();
}
// state transition not triggered until drain is finished.
(SyncRound::Fetch(self))
}
}
}
fn requests_abandoned(mut self, abandoned: &[ReqId]) -> SyncRound {
for abandoned in abandoned {
match self.pending.remove(abandoned) {
None => {},
Some(req) => self.requests.push(req),
}
}
// TODO: track failure rate and potentially abort.
SyncRound::Fetch(self)
}
fn dispatch_requests<D>(mut self, mut dispatcher: D) -> SyncRound
where D: FnMut(HeadersRequest) -> Option<ReqId>
{
while let Some(pending_req) = self.requests.pop() {
match dispatcher(pending_req.headers_request.clone()) {
Some(req_id) => {
trace!(target: "sync", "Assigned request for subchain ({} -> {})",
pending_req.subchain_parent.0 + 1, pending_req.subchain_end.0);
self.pending.insert(req_id, pending_req);
}
None => {
self.requests.push(pending_req);
break;
}
}
}
SyncRound::Fetch(self)
}
fn drain(mut self, headers: &mut Vec<Header>, max: Option<usize>) -> SyncRound {
let max = ::std::cmp::min(max.unwrap_or(usize::max_value()), self.ready.len());
headers.extend(self.ready.drain(0..max));
if self.sparse.is_empty() && self.ready.is_empty() {
SyncRound::Start(RoundStart::new(self.end))
} else {
SyncRound::Fetch(self)
}
}
}
/// Round started: get stepped header chain.
/// from a start block with number X we request 256 headers stepped by 256 from
/// block X + 1.
pub struct RoundStart {
start_block: (u64, H256),
pending_req: Option<(ReqId, HeadersRequest)>,
sparse_headers: Vec<Header>,
contributors: HashSet<PeerId>,
attempt: usize,
}
impl RoundStart {
fn new(start: (u64, H256)) -> Self {
RoundStart {
start_block: start.clone(),
pending_req: None,
sparse_headers: Vec::new(),
contributors: HashSet::new(),
attempt: 0,
}
}
// called on failed attempt. may trigger a transition after a number of attempts.
// a failed attempt is defined as any time a peer returns invalid or incomplete response
fn failed_attempt(mut self) -> SyncRound {
self.attempt += 1;
if self.attempt >= SCAFFOLD_ATTEMPTS {
if self.sparse_headers.len() > 1 {
Fetcher::new(self.sparse_headers, self.contributors.into_iter().collect())
} else {
SyncRound::Abort(AbortReason::NoResponses)
}
} else {
SyncRound::Start(self)
}
}
fn process_response<R: ResponseContext>(mut self, ctx: &R) -> SyncRound {
let req = match self.pending_req.take() {
Some((id, ref req)) if ctx.req_id() == &id => { req.clone() }
other => {
self.pending_req = other;
return SyncRound::Start(self);
}
};
match response::decode_and_verify(ctx.data(), &req) {
Ok(headers) => {
if self.sparse_headers.len() == 0
&& headers.get(0).map_or(false, |x| x.parent_hash() != &self.start_block.1) {
trace!(target: "sync", "Wrong parent for first header in round");
ctx.punish_responder(); // or should we reset?
}
self.contributors.insert(ctx.responder());
self.sparse_headers.extend(headers);
if self.sparse_headers.len() == ROUND_FRAMES + 1 {
trace!(target: "sync", "Beginning fetch of blocks between {} sparse headers",
self.sparse_headers.len());
return Fetcher::new(self.sparse_headers, self.contributors.into_iter().collect());
}
}
Err(e) => {
trace!(target: "sync", "Punishing peer {} for malformed response ({})", ctx.responder(), e);
ctx.punish_responder();
}
};
self.failed_attempt()
}
fn requests_abandoned(mut self, abandoned: &[ReqId]) -> SyncRound {
match self.pending_req.take() {
Some((id, req)) => {
if abandoned.iter().any(|r| r == &id) {
self.pending_req = None;
self.failed_attempt()
} else {
self.pending_req = Some((id, req));
SyncRound::Start(self)
}
}
None => SyncRound::Start(self),
}
}
fn dispatch_requests<D>(mut self, mut dispatcher: D) -> SyncRound
where D: FnMut(HeadersRequest) -> Option<ReqId>
{
if self.pending_req.is_none() {
// beginning offset + first block expected after last header we have.
let start = (self.start_block.0 + 1)
+ self.sparse_headers.len() as u64 * (ROUND_SKIP + 1);
let headers_request = HeadersRequest {
start: start.into(),
max: (ROUND_FRAMES - 1) - self.sparse_headers.len(),
skip: ROUND_SKIP,
reverse: false,
};
if let Some(req_id) = dispatcher(headers_request.clone()) {
self.pending_req = Some((req_id, headers_request));
}
}
SyncRound::Start(self)
}
}
/// Sync round state machine.
pub enum SyncRound {
/// Beginning a sync round.
Start(RoundStart),
/// Fetching intermediate blocks during a sync round.
Fetch(Fetcher),
/// Aborted.
Abort(AbortReason),
}
impl SyncRound {
fn abort(reason: AbortReason) -> Self {
trace!(target: "sync", "Aborting sync round: {:?}", reason);
SyncRound::Abort(reason)
}
/// Begin sync rounds from a starting block.
pub fn begin(num: u64, hash: H256) -> Self {
SyncRound::Start(RoundStart::new((num, hash)))
}
/// Process an answer to a request. Unknown requests will be ignored.
pub fn process_response<R: ResponseContext>(self, ctx: &R) -> Self {
match self {
SyncRound::Start(round_start) => round_start.process_response(ctx),
SyncRound::Fetch(fetcher) => fetcher.process_response(ctx),
other => other,
}
}
/// Return unfulfilled requests from disconnected peer. Unknown requests will be ignored.
pub fn requests_abandoned(self, abandoned: &[ReqId]) -> Self {
match self {
SyncRound::Start(round_start) => round_start.requests_abandoned(abandoned),
SyncRound::Fetch(fetcher) => fetcher.requests_abandoned(abandoned),
other => other,
}
}
/// Dispatch pending requests. The dispatcher provided will attempt to
/// find a suitable peer to serve the request.
// TODO: have dispatcher take capabilities argument? and return an error as
// to why no suitable peer can be found? (no buffer, no chain heads that high, etc)
pub fn dispatch_requests<D>(self, dispatcher: D) -> Self
where D: FnMut(HeadersRequest) -> Option<ReqId>
{
match self {
SyncRound::Start(round_start) => round_start.dispatch_requests(dispatcher),
SyncRound::Fetch(fetcher) => fetcher.dispatch_requests(dispatcher),
other => other,
}
}
/// Drain up to a maximum number (None -> all) of headers (continuous, starting with a child of
/// the round start block) from the round, starting a new one once finished.
pub fn drain(self, v: &mut Vec<Header>, max: Option<usize>) -> Self {
match self {
SyncRound::Fetch(fetcher) => fetcher.drain(v, max),
other => other,
}
}
}