diff --git a/Cargo.lock b/Cargo.lock index 17928f75b..61d380352 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -522,6 +522,7 @@ dependencies = [ "ethcore-util 1.5.0", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "rlp 0.1.0", + "smallvec 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -2000,6 +2001,11 @@ name = "smallvec" version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "smallvec" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "solicit" version = "0.4.4" @@ -2514,6 +2520,7 @@ dependencies = [ "checksum slab 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6dbdd334bd28d328dad1c41b0ea662517883d8880d8533895ef96c8003dec9c4" "checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23" "checksum smallvec 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "fcc8d19212aacecf95e4a7a2179b26f7aeb9732a915cf01f05b0d3e044865410" +"checksum smallvec 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3a3c84984c278afe61a46e19868e8b23e2ee3be5b3cc6dea6edad4893bc6c841" "checksum solicit 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "172382bac9424588d7840732b250faeeef88942e37b6e35317dce98cafdd75b2" "checksum spmc 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "93bdab61c1a413e591c4d17388ffa859eaff2df27f1e13a5ec8b716700605adf" "checksum stable-heap 0.1.0 (git+https://github.com/carllerche/stable-heap?rev=3c5cd1ca47)" = "" diff --git a/ethcore/light/Cargo.toml b/ethcore/light/Cargo.toml index 37d7034d2..67ae166dc 100644 --- a/ethcore/light/Cargo.toml +++ b/ethcore/light/Cargo.toml @@ -19,7 +19,8 @@ ethcore-io = { path = "../../util/io" } ethcore-ipc = { path = "../../ipc/rpc", optional = true } rlp = { path = "../../util/rlp" } time = "0.1" +smallvec = "0.3.1" [features] default = [] -ipc = ["ethcore-ipc", "ethcore-ipc-codegen"] \ No newline at end of file +ipc = ["ethcore-ipc", "ethcore-ipc-codegen"] diff --git a/ethcore/light/src/client.rs b/ethcore/light/src/client.rs deleted file mode 100644 index 14d032821..000000000 --- a/ethcore/light/src/client.rs +++ /dev/null @@ -1,121 +0,0 @@ -// Copyright 2015, 2016 Parity Technologies (UK) Ltd. -// This file is part of Parity. - -// Parity is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity. If not, see . - -//! Light client implementation. Stores data from light sync - -use std::sync::Arc; - -use ethcore::engines::Engine; -use ethcore::ids::BlockId; -use ethcore::service::ClientIoMessage; -use ethcore::block_import_error::BlockImportError; -use ethcore::block_status::BlockStatus; -use ethcore::verification::queue::{HeaderQueue, QueueInfo}; -use ethcore::transaction::{SignedTransaction, PendingTransaction}; -use ethcore::blockchain_info::BlockChainInfo; -use ethcore::encoded; - -use io::IoChannel; -use util::hash::{H256, H256FastMap}; -use util::{Bytes, Mutex}; - -use provider::Provider; -use request; - -/// Light client implementation. -pub struct Client { - _engine: Arc, - header_queue: HeaderQueue, - _message_channel: Mutex>, - tx_pool: Mutex>, -} - -impl Client { - /// Import a header as rlp-encoded bytes. - pub fn import_header(&self, bytes: Bytes) -> Result { - let header = ::rlp::decode(&bytes); - - self.header_queue.import(header).map_err(Into::into) - } - - /// Whether the block is already known (but not necessarily part of the canonical chain) - pub fn is_known(&self, _id: BlockId) -> bool { - false - } - - /// Import a local transaction. - pub fn import_own_transaction(&self, tx: SignedTransaction) { - self.tx_pool.lock().insert(tx.hash(), tx); - } - - /// Fetch a vector of all pending transactions. - pub fn pending_transactions(&self) -> Vec { - self.tx_pool.lock().values().cloned().collect() - } - - /// Inquire about the status of a given block (or header). - pub fn status(&self, _id: BlockId) -> BlockStatus { - BlockStatus::Unknown - } - - /// Get the header queue info. - pub fn queue_info(&self) -> QueueInfo { - self.header_queue.queue_info() - } -} - -// dummy implementation -- may draw from canonical cache further on. -impl Provider for Client { - fn chain_info(&self) -> BlockChainInfo { - unimplemented!() - } - - fn reorg_depth(&self, _a: &H256, _b: &H256) -> Option { - None - } - - fn earliest_state(&self) -> Option { - None - } - - fn block_header(&self, _id: BlockId) -> Option { - None - } - - fn block_body(&self, _id: BlockId) -> Option { - None - } - - fn block_receipts(&self, _hash: &H256) -> Option { - None - } - - fn state_proof(&self, _req: request::StateProof) -> Vec { - Vec::new() - } - - fn contract_code(&self, _req: request::ContractCode) -> Bytes { - Vec::new() - } - - fn header_proof(&self, _req: request::HeaderProof) -> Option<(encoded::Header, Vec)> { - None - } - - fn ready_transactions(&self) -> Vec { - Vec::new() - } -} diff --git a/ethcore/light/src/client/cht.rs b/ethcore/light/src/client/cht.rs new file mode 100644 index 000000000..8424e0c31 --- /dev/null +++ b/ethcore/light/src/client/cht.rs @@ -0,0 +1,22 @@ +// Copyright 2015, 2016 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +//! Canonical hash trie definitions and helper functions. + +/// The size of each CHT. +pub const SIZE: u64 = 2048; + +/// Convert a block number to a CHT number. +pub fn block_to_cht_number(block_num: u64) -> u64 { + (block_num + 1) / SIZE +} diff --git a/ethcore/light/src/client/header_chain.rs b/ethcore/light/src/client/header_chain.rs new file mode 100644 index 000000000..fdb075e04 --- /dev/null +++ b/ethcore/light/src/client/header_chain.rs @@ -0,0 +1,365 @@ +// Copyright 2015, 2016 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Light client header chain. +//! +//! Unlike a full node's `BlockChain` this doesn't store much in the database. +//! It stores candidates for the last 2048-4096 blocks as well as CHT roots for +//! historical blocks all the way to the genesis. +//! +//! This is separate from the `BlockChain` for two reasons: +//! - It stores only headers (and a pruned subset of them) +//! - To allow for flexibility in the database layout once that's incorporated. +// TODO: use DB instead of memory. DB Layout: just the contents of `candidates`/`headers` +// + +use std::collections::{BTreeMap, HashMap}; + +use client::cht; + +use ethcore::block_status::BlockStatus; +use ethcore::error::BlockError; +use ethcore::ids::BlockId; +use ethcore::views::HeaderView; +use util::{Bytes, H256, U256, HeapSizeOf, Mutex, RwLock}; + +use smallvec::SmallVec; + +/// Store at least this many candidate headers at all times. +/// Also functions as the delay for computing CHTs as they aren't +/// relevant to any blocks we've got in memory. +const HISTORY: u64 = 2048; + +/// Information about a block. +#[derive(Debug, Clone)] +pub struct BlockDescriptor { + /// The block's hash + pub hash: H256, + /// The block's number + pub number: u64, + /// The block's total difficulty. + pub total_difficulty: U256, +} + +// candidate block description. +struct Candidate { + hash: H256, + parent_hash: H256, + total_difficulty: U256, +} + +struct Entry { + candidates: SmallVec<[Candidate; 3]>, // 3 arbitrarily chosen + canonical_hash: H256, +} + +impl HeapSizeOf for Entry { + fn heap_size_of_children(&self) -> usize { + match self.candidates.spilled() { + false => 0, + true => self.candidates.capacity() * ::std::mem::size_of::(), + } + } +} + +/// Header chain. See module docs for more details. +pub struct HeaderChain { + genesis_header: Bytes, // special-case the genesis. + candidates: RwLock>, + headers: RwLock>, + best_block: RwLock, + cht_roots: Mutex>, +} + +impl HeaderChain { + /// Create a new header chain given this genesis block. + pub fn new(genesis: &[u8]) -> Self { + let g_view = HeaderView::new(genesis); + + HeaderChain { + genesis_header: genesis.to_owned(), + best_block: RwLock::new(BlockDescriptor { + hash: g_view.hash(), + number: 0, + total_difficulty: g_view.difficulty(), + }), + candidates: RwLock::new(BTreeMap::new()), + headers: RwLock::new(HashMap::new()), + cht_roots: Mutex::new(Vec::new()), + } + } + + /// Insert a pre-verified header. + /// + /// This blindly trusts that the data given to it is + /// a) valid RLP encoding of a header and + /// b) has sensible data contained within it. + pub fn insert(&self, header: Bytes) -> Result<(), BlockError> { + let view = HeaderView::new(&header); + let hash = view.hash(); + let number = view.number(); + let parent_hash = view.parent_hash(); + + // hold candidates the whole time to guard import order. + let mut candidates = self.candidates.write(); + + // find parent details. + let parent_td = + if number == 1 { + let g_view = HeaderView::new(&self.genesis_header); + g_view.difficulty() + } else { + candidates.get(&(number - 1)) + .and_then(|entry| entry.candidates.iter().find(|c| c.hash == parent_hash)) + .map(|c| c.total_difficulty) + .ok_or_else(|| BlockError::UnknownParent(parent_hash))? + }; + + let total_difficulty = parent_td + view.difficulty(); + + // insert headers and candidates entries. + candidates.entry(number).or_insert_with(|| Entry { candidates: SmallVec::new(), canonical_hash: hash}) + .candidates.push(Candidate { + hash: hash, + parent_hash: parent_hash, + total_difficulty: total_difficulty, + }); + + self.headers.write().insert(hash, header.clone()); + + // reorganize ancestors so canonical entries are first in their + // respective candidates vectors. + if self.best_block.read().total_difficulty < total_difficulty { + let mut canon_hash = hash; + for (_, entry) in candidates.iter_mut().rev().skip_while(|&(height, _)| *height > number) { + if entry.canonical_hash == canon_hash { break; } + + let canon = entry.candidates.iter().find(|x| x.hash == canon_hash) + .expect("blocks are only inserted if parent is present; or this is the block we just added; qed"); + + // what about reorgs > cht::SIZE + HISTORY? + // resetting to the last block of a given CHT should be possible. + canon_hash = canon.parent_hash; + } + + *self.best_block.write() = BlockDescriptor { + hash: hash, + number: number, + total_difficulty: total_difficulty, + }; + + // produce next CHT root if it's time. + let earliest_era = *candidates.keys().next().expect("at least one era just created; qed"); + if earliest_era + HISTORY + cht::SIZE <= number { + let mut values = Vec::with_capacity(cht::SIZE as usize); + { + let mut headers = self.headers.write(); + for i in (0..cht::SIZE).map(|x| x + earliest_era) { + let era_entry = candidates.remove(&i) + .expect("all eras are sequential with no gaps; qed"); + + for ancient in &era_entry.candidates { + headers.remove(&ancient.hash); + } + + values.push(( + ::rlp::encode(&i).to_vec(), + ::rlp::encode(&era_entry.canonical_hash).to_vec(), + )); + } + } + + let cht_root = ::util::triehash::trie_root(values); + debug!(target: "chain", "Produced CHT {} root: {:?}", (earliest_era - 1) % cht::SIZE, cht_root); + + self.cht_roots.lock().push(cht_root); + } + } + + Ok(()) + } + + /// Get a block header. In the case of query by number, only canonical blocks + /// will be returned. + pub fn get_header(&self, id: BlockId) -> Option { + match id { + BlockId::Earliest | BlockId::Number(0) => Some(self.genesis_header.clone()), + BlockId::Hash(hash) => self.headers.read().get(&hash).map(|x| x.to_vec()), + BlockId::Number(num) => { + if self.best_block.read().number < num { return None } + + self.candidates.read().get(&num).map(|entry| entry.canonical_hash) + .and_then(|hash| self.headers.read().get(&hash).map(|x| x.to_vec())) + } + BlockId::Latest | BlockId::Pending => { + let hash = self.best_block.read().hash; + self.headers.read().get(&hash).map(|x| x.to_vec()) + } + } + } + + /// Get the nth CHT root, if it's been computed. + /// + /// CHT root 0 is from block `1..2048`. + /// CHT root 1 is from block `2049..4096` + /// and so on. + /// + /// This is because it's assumed that the genesis hash is known, + /// so including it within a CHT would be redundant. + pub fn cht_root(&self, n: usize) -> Option { + self.cht_roots.lock().get(n).map(|h| h.clone()) + } + + /// Get the genesis hash. + pub fn genesis_hash(&self) -> H256 { + ::util::Hashable::sha3(&self.genesis_header) + } + + /// Get the best block's data. + pub fn best_block(&self) -> BlockDescriptor { + self.best_block.read().clone() + } + + /// If there is a gap between the genesis and the rest + /// of the stored blocks, return the first post-gap block. + pub fn first_block(&self) -> Option { + let candidates = self.candidates.read(); + match candidates.iter().next() { + None | Some((&1, _)) => None, + Some((&height, entry)) => Some(BlockDescriptor { + number: height, + hash: entry.canonical_hash, + total_difficulty: entry.candidates.iter().find(|x| x.hash == entry.canonical_hash) + .expect("entry always stores canonical candidate; qed").total_difficulty, + }) + } + } + + /// Get block status. + pub fn status(&self, hash: &H256) -> BlockStatus { + match self.headers.read().contains_key(hash) { + true => BlockStatus::InChain, + false => BlockStatus::Unknown, + } + } +} + +impl HeapSizeOf for HeaderChain { + fn heap_size_of_children(&self) -> usize { + self.candidates.read().heap_size_of_children() + + self.headers.read().heap_size_of_children() + + self.cht_roots.lock().heap_size_of_children() + } +} + +#[cfg(test)] +mod tests { + use super::HeaderChain; + use ethcore::ids::BlockId; + use ethcore::header::Header; + use ethcore::spec::Spec; + + #[test] + fn basic_chain() { + let spec = Spec::new_test(); + let genesis_header = spec.genesis_header(); + + let chain = HeaderChain::new(&::rlp::encode(&genesis_header)); + + let mut parent_hash = genesis_header.hash(); + let mut rolling_timestamp = genesis_header.timestamp(); + for i in 1..10000 { + let mut header = Header::new(); + header.set_parent_hash(parent_hash); + header.set_number(i); + header.set_timestamp(rolling_timestamp); + header.set_difficulty(*genesis_header.difficulty() * i.into()); + + chain.insert(::rlp::encode(&header).to_vec()).unwrap(); + + parent_hash = header.hash(); + rolling_timestamp += 10; + } + + assert!(chain.get_header(BlockId::Number(10)).is_none()); + assert!(chain.get_header(BlockId::Number(9000)).is_some()); + assert!(chain.cht_root(2).is_some()); + assert!(chain.cht_root(3).is_none()); + } + + #[test] + fn reorganize() { + let spec = Spec::new_test(); + let genesis_header = spec.genesis_header(); + + let chain = HeaderChain::new(&::rlp::encode(&genesis_header)); + + let mut parent_hash = genesis_header.hash(); + let mut rolling_timestamp = genesis_header.timestamp(); + for i in 1..6 { + let mut header = Header::new(); + header.set_parent_hash(parent_hash); + header.set_number(i); + header.set_timestamp(rolling_timestamp); + header.set_difficulty(*genesis_header.difficulty() * i.into()); + + chain.insert(::rlp::encode(&header).to_vec()).unwrap(); + + parent_hash = header.hash(); + rolling_timestamp += 10; + } + + { + let mut rolling_timestamp = rolling_timestamp; + let mut parent_hash = parent_hash; + for i in 6..16 { + let mut header = Header::new(); + header.set_parent_hash(parent_hash); + header.set_number(i); + header.set_timestamp(rolling_timestamp); + header.set_difficulty(*genesis_header.difficulty() * i.into()); + + chain.insert(::rlp::encode(&header).to_vec()).unwrap(); + + parent_hash = header.hash(); + rolling_timestamp += 10; + } + } + + assert_eq!(chain.best_block().number, 15); + + { + let mut rolling_timestamp = rolling_timestamp; + let mut parent_hash = parent_hash; + + // import a shorter chain which has better TD. + for i in 6..13 { + let mut header = Header::new(); + header.set_parent_hash(parent_hash); + header.set_number(i); + header.set_timestamp(rolling_timestamp); + header.set_difficulty(*genesis_header.difficulty() * (i * i).into()); + + chain.insert(::rlp::encode(&header).to_vec()).unwrap(); + + parent_hash = header.hash(); + rolling_timestamp += 11; + } + } + + assert_eq!(chain.best_block().number, 12); + } +} diff --git a/ethcore/light/src/client/mod.rs b/ethcore/light/src/client/mod.rs new file mode 100644 index 000000000..1d32361a7 --- /dev/null +++ b/ethcore/light/src/client/mod.rs @@ -0,0 +1,264 @@ +// Copyright 2015, 2016 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Light client implementation. Stores data from light sync + +use ethcore::block_import_error::BlockImportError; +use ethcore::block_status::BlockStatus; +use ethcore::client::ClientReport; +use ethcore::ids::BlockId; +use ethcore::header::Header; +use ethcore::verification::queue::{self, HeaderQueue}; +use ethcore::transaction::PendingTransaction; +use ethcore::blockchain_info::BlockChainInfo; +use ethcore::spec::Spec; +use ethcore::service::ClientIoMessage; +use ethcore::encoded; +use io::IoChannel; + +use util::hash::{H256, H256FastMap}; +use util::{Bytes, Mutex, RwLock}; + +use provider::Provider; +use request; + +use self::header_chain::HeaderChain; + +pub use self::service::Service; + +pub mod cht; + +mod header_chain; +mod service; + +/// Configuration for the light client. +#[derive(Debug, Default, Clone)] +pub struct Config { + /// Verification queue config. + pub queue: queue::Config, +} + +/// Trait for interacting with the header chain abstractly. +pub trait LightChainClient: Send + Sync { + /// Get chain info. + fn chain_info(&self) -> BlockChainInfo; + + /// Queue header to be verified. Required that all headers queued have their + /// parent queued prior. + fn queue_header(&self, header: Header) -> Result; + + /// Query whether a block is known. + fn is_known(&self, hash: &H256) -> bool; + + /// Clear the queue. + fn clear_queue(&self); + + /// Get queue info. + fn queue_info(&self) -> queue::QueueInfo; + + /// Get the `i`th CHT root. + fn cht_root(&self, i: usize) -> Option; +} + +/// Light client implementation. +pub struct Client { + queue: HeaderQueue, + chain: HeaderChain, + tx_pool: Mutex>, + report: RwLock, + import_lock: Mutex<()>, +} + +impl Client { + /// Create a new `Client`. + pub fn new(config: Config, spec: &Spec, io_channel: IoChannel) -> Self { + Client { + queue: HeaderQueue::new(config.queue, spec.engine.clone(), io_channel, true), + chain: HeaderChain::new(&::rlp::encode(&spec.genesis_header())), + tx_pool: Mutex::new(Default::default()), + report: RwLock::new(ClientReport::default()), + import_lock: Mutex::new(()), + } + } + + /// Import a header to the queue for additional verification. + pub fn import_header(&self, header: Header) -> Result { + self.queue.import(header).map_err(Into::into) + } + + /// Import a local transaction. + pub fn import_own_transaction(&self, tx: PendingTransaction) { + self.tx_pool.lock().insert(tx.transaction.hash(), tx); + } + + /// Fetch a vector of all pending transactions. + pub fn ready_transactions(&self) -> Vec { + let best_num = self.chain.best_block().number; + self.tx_pool.lock() + .values() + .filter(|t| t.min_block.as_ref().map_or(true, |x| x <= &best_num)) + .cloned() + .collect() + } + + /// Inquire about the status of a given header. + pub fn status(&self, hash: &H256) -> BlockStatus { + match self.queue.status(hash) { + queue::Status::Unknown => self.chain.status(hash), + other => other.into(), + } + } + + /// Get the chain info. + pub fn chain_info(&self) -> BlockChainInfo { + let best_block = self.chain.best_block(); + let first_block = self.chain.first_block(); + let genesis_hash = self.chain.genesis_hash(); + + BlockChainInfo { + total_difficulty: best_block.total_difficulty, + pending_total_difficulty: best_block.total_difficulty, + genesis_hash: genesis_hash, + best_block_hash: best_block.hash, + best_block_number: best_block.number, + ancient_block_hash: if first_block.is_some() { Some(genesis_hash) } else { None }, + ancient_block_number: if first_block.is_some() { Some(0) } else { None }, + first_block_hash: first_block.as_ref().map(|first| first.hash), + first_block_number: first_block.as_ref().map(|first| first.number), + } + } + + /// Get the header queue info. + pub fn queue_info(&self) -> queue::QueueInfo { + self.queue.queue_info() + } + + /// Get a block header by Id. + pub fn get_header(&self, id: BlockId) -> Option { + self.chain.get_header(id) + } + + /// Get the `i`th CHT root. + pub fn cht_root(&self, i: usize) -> Option { + self.chain.cht_root(i) + } + + /// Import a set of pre-verified headers from the queue. + pub fn import_verified(&self) { + const MAX: usize = 256; + + let _lock = self.import_lock.lock(); + + let mut bad = Vec::new(); + let mut good = Vec::new(); + for verified_header in self.queue.drain(MAX) { + let (num, hash) = (verified_header.number(), verified_header.hash()); + + match self.chain.insert(::rlp::encode(&verified_header).to_vec()) { + Ok(()) => { + good.push(hash); + self.report.write().blocks_imported += 1; + } + Err(e) => { + debug!(target: "client", "Error importing header {:?}: {}", (num, hash), e); + bad.push(hash); + } + } + } + + self.queue.mark_as_bad(&bad); + self.queue.mark_as_good(&good); + } + + /// Get a report about blocks imported. + pub fn report(&self) -> ClientReport { + ::std::mem::replace(&mut *self.report.write(), ClientReport::default()) + } + + /// Get blockchain mem usage in bytes. + pub fn chain_mem_used(&self) -> usize { + use util::HeapSizeOf; + + self.chain.heap_size_of_children() + } +} + +impl LightChainClient for Client { + fn chain_info(&self) -> BlockChainInfo { Client::chain_info(self) } + + fn queue_header(&self, header: Header) -> Result { + self.import_header(header) + } + + fn is_known(&self, hash: &H256) -> bool { + self.status(hash) == BlockStatus::InChain + } + + fn clear_queue(&self) { + self.queue.clear() + } + + fn queue_info(&self) -> queue::QueueInfo { + self.queue.queue_info() + } + + fn cht_root(&self, i: usize) -> Option { + Client::cht_root(self, i) + } +} + +// dummy implementation -- may draw from canonical cache further on. +impl Provider for Client { + fn chain_info(&self) -> BlockChainInfo { + Client::chain_info(self) + } + + fn reorg_depth(&self, _a: &H256, _b: &H256) -> Option { + None + } + + fn earliest_state(&self) -> Option { + None + } + + fn block_header(&self, id: BlockId) -> Option { + self.chain.get_header(id).map(encoded::Header::new) + } + + fn block_body(&self, _id: BlockId) -> Option { + None + } + + fn block_receipts(&self, _hash: &H256) -> Option { + None + } + + fn state_proof(&self, _req: request::StateProof) -> Vec { + Vec::new() + } + + fn contract_code(&self, _req: request::ContractCode) -> Bytes { + Vec::new() + } + + fn header_proof(&self, _req: request::HeaderProof) -> Option<(encoded::Header, Vec)> { + None + } + + fn ready_transactions(&self) -> Vec { + Vec::new() + } +} diff --git a/ethcore/light/src/client/service.rs b/ethcore/light/src/client/service.rs new file mode 100644 index 000000000..79c53bac6 --- /dev/null +++ b/ethcore/light/src/client/service.rs @@ -0,0 +1,73 @@ +// Copyright 2015, 2016 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Minimal IO service for light client. +//! Just handles block import messages and passes them to the client. + +use std::sync::Arc; + +use ethcore::service::ClientIoMessage; +use ethcore::spec::Spec; +use io::{IoContext, IoError, IoHandler, IoService}; + +use super::{Client, Config as ClientConfig}; + +/// Light client service. +pub struct Service { + client: Arc, + _io_service: IoService, +} + +impl Service { + /// Start the service: initialize I/O workers and client itself. + pub fn start(config: ClientConfig, spec: &Spec) -> Result { + let io_service = try!(IoService::::start()); + let client = Arc::new(Client::new(config, spec, io_service.channel())); + try!(io_service.register_handler(Arc::new(ImportBlocks(client.clone())))); + + Ok(Service { + client: client, + _io_service: io_service, + }) + } + + /// Get a handle to the client. + pub fn client(&self) -> &Arc { + &self.client + } +} + +struct ImportBlocks(Arc); + +impl IoHandler for ImportBlocks { + fn message(&self, _io: &IoContext, message: &ClientIoMessage) { + if let ClientIoMessage::BlockVerified = *message { + self.0.import_verified(); + } + } +} + +#[cfg(test)] +mod tests { + use super::Service; + use ethcore::spec::Spec; + + #[test] + fn it_works() { + let spec = Spec::new_test(); + Service::start(Default::default(), &spec).unwrap(); + } +} diff --git a/ethcore/light/src/lib.rs b/ethcore/light/src/lib.rs index d59066b82..5cdc3addc 100644 --- a/ethcore/light/src/lib.rs +++ b/ethcore/light/src/lib.rs @@ -25,8 +25,10 @@ //! low-latency applications, but perfectly suitable for simple everyday //! use-cases like sending transactions from a personal account. //! -//! It starts by performing a header-only sync, verifying random samples -//! of members of the chain to varying degrees. +//! The light client performs a header-only sync, doing verification and pruning +//! historical blocks. Upon pruning, batches of 2048 blocks have a number => hash +//! mapping sealed into "canonical hash tries" which can later be used to verify +//! historical block queries from peers. #![deny(missing_docs)] @@ -60,7 +62,8 @@ extern crate ethcore_util as util; extern crate ethcore_network as network; extern crate ethcore_io as io; extern crate rlp; +extern crate smallvec; extern crate time; #[cfg(feature = "ipc")] -extern crate ethcore_ipc as ipc; \ No newline at end of file +extern crate ethcore_ipc as ipc; diff --git a/ethcore/light/src/net/context.rs b/ethcore/light/src/net/context.rs index d9910f958..e95434a3b 100644 --- a/ethcore/light/src/net/context.rs +++ b/ethcore/light/src/net/context.rs @@ -20,7 +20,7 @@ use network::{NetworkContext, PeerId, NodeId}; use super::{Announcement, LightProtocol, ReqId}; use super::error::Error; -use request::Request; +use request::{self, Request}; /// An I/O context which allows sending and receiving packets as well as /// disconnecting peers. This is used as a generalization of the portions @@ -77,12 +77,8 @@ impl<'a> IoContext for NetworkContext<'a> { } } -/// Context for a protocol event. -pub trait EventContext { - /// Get the peer relevant to the event e.g. message sender, - /// disconnected/connected peer. - fn peer(&self) -> PeerId; - +/// Basic context for a the protocol. +pub trait BasicContext { /// Returns the relevant's peer persistent Id (aka NodeId). fn persistent_peer_id(&self, peer: PeerId) -> Option; @@ -93,6 +89,10 @@ pub trait EventContext { // TODO: maybe just put this on a timer in LightProtocol? fn make_announcement(&self, announcement: Announcement); + /// Find the maximum number of requests of a specific type which can be made from + /// supplied peer. + fn max_requests(&self, peer: PeerId, kind: request::Kind) -> usize; + /// Disconnect a peer. fn disconnect_peer(&self, peer: PeerId); @@ -100,6 +100,50 @@ pub trait EventContext { fn disable_peer(&self, peer: PeerId); } +/// Context for a protocol event which has a peer ID attached. +pub trait EventContext: BasicContext { + /// Get the peer relevant to the event e.g. message sender, + /// disconnected/connected peer. + fn peer(&self) -> PeerId; + + /// Treat the event context as a basic context. + fn as_basic(&self) -> &BasicContext; +} + +/// Basic context. +pub struct TickCtx<'a> { + /// Io context to enable dispatch. + pub io: &'a IoContext, + /// Protocol implementation. + pub proto: &'a LightProtocol, +} + +impl<'a> BasicContext for TickCtx<'a> { + fn persistent_peer_id(&self, id: PeerId) -> Option { + self.io.persistent_peer_id(id) + } + + fn request_from(&self, peer: PeerId, request: Request) -> Result { + self.proto.request_from(self.io, &peer, request) + } + + fn make_announcement(&self, announcement: Announcement) { + self.proto.make_announcement(self.io, announcement); + } + + fn max_requests(&self, peer: PeerId, kind: request::Kind) -> usize { + self.proto.max_requests(peer, kind) + } + + fn disconnect_peer(&self, peer: PeerId) { + self.io.disconnect_peer(peer); + } + + fn disable_peer(&self, peer: PeerId) { + self.io.disable_peer(peer); + } +} + /// Concrete implementation of `EventContext` over the light protocol struct and /// an io context. pub struct Ctx<'a> { @@ -111,11 +155,7 @@ pub struct Ctx<'a> { pub peer: PeerId, } -impl<'a> EventContext for Ctx<'a> { - fn peer(&self) -> PeerId { - self.peer - } - +impl<'a> BasicContext for Ctx<'a> { fn persistent_peer_id(&self, id: PeerId) -> Option { self.io.persistent_peer_id(id) } @@ -128,6 +168,10 @@ impl<'a> EventContext for Ctx<'a> { self.proto.make_announcement(self.io, announcement); } + fn max_requests(&self, peer: PeerId, kind: request::Kind) -> usize { + self.proto.max_requests(peer, kind) + } + fn disconnect_peer(&self, peer: PeerId) { self.io.disconnect_peer(peer); } @@ -136,3 +180,13 @@ impl<'a> EventContext for Ctx<'a> { self.io.disable_peer(peer); } } + +impl<'a> EventContext for Ctx<'a> { + fn peer(&self) -> PeerId { + self.peer + } + + fn as_basic(&self) -> &BasicContext { + &*self + } +} diff --git a/ethcore/light/src/net/error.rs b/ethcore/light/src/net/error.rs index 42d038679..6a7746543 100644 --- a/ethcore/light/src/net/error.rs +++ b/ethcore/light/src/net/error.rs @@ -62,6 +62,8 @@ pub enum Error { UnsupportedProtocolVersion(u8), /// Bad protocol version. BadProtocolVersion, + /// Peer is overburdened. + Overburdened, } impl Error { @@ -79,6 +81,7 @@ impl Error { Error::NotServer => Punishment::Disable, Error::UnsupportedProtocolVersion(_) => Punishment::Disable, Error::BadProtocolVersion => Punishment::Disable, + Error::Overburdened => Punishment::None, } } } @@ -107,8 +110,9 @@ impl fmt::Display for Error { Error::UnknownPeer => write!(f, "Unknown peer"), Error::UnsolicitedResponse => write!(f, "Peer provided unsolicited data"), Error::NotServer => write!(f, "Peer not a server."), - Error::UnsupportedProtocolVersion(pv) => write!(f, "Unsupported protocol version: {}", pv), + Error::UnsupportedProtocolVersion(pv) => write!(f, "Unsupported protocol version: {}", pv), Error::BadProtocolVersion => write!(f, "Bad protocol version in handshake"), + Error::Overburdened => write!(f, "Peer overburdened"), } } -} \ No newline at end of file +} diff --git a/ethcore/light/src/net/mod.rs b/ethcore/light/src/net/mod.rs index 18df0f899..20073c0af 100644 --- a/ethcore/light/src/net/mod.rs +++ b/ethcore/light/src/net/mod.rs @@ -37,8 +37,8 @@ use provider::Provider; use request::{self, HashOrNumber, Request}; use self::buffer_flow::{Buffer, FlowParams}; -use self::context::Ctx; -use self::error::{Error, Punishment}; +use self::context::{Ctx, TickCtx}; +use self::error::Punishment; mod buffer_flow; mod context; @@ -48,12 +48,16 @@ mod status; #[cfg(test)] mod tests; -pub use self::context::{EventContext, IoContext}; +pub use self::error::Error; +pub use self::context::{BasicContext, EventContext, IoContext}; pub use self::status::{Status, Capabilities, Announcement}; const TIMEOUT: TimerToken = 0; const TIMEOUT_INTERVAL_MS: u64 = 1000; +const TICK_TIMEOUT: TimerToken = 1; +const TICK_TIMEOUT_INTERVAL_MS: u64 = 5000; + // minimum interval between updates. const UPDATE_INTERVAL_MS: i64 = 5000; @@ -131,8 +135,9 @@ struct Peer { status: Status, capabilities: Capabilities, remote_flow: Option<(Buffer, FlowParams)>, - sent_head: H256, // last head we've given them. + sent_head: H256, // last chain head we've given them. last_update: SteadyTime, + idle: bool, // make into a current percentage of max buffer being requested? } impl Peer { @@ -188,7 +193,11 @@ pub trait Handler: Send + Sync { /// Called when a peer responds with header proofs. Each proof is a block header coupled /// with a series of trie nodes is ascending order by distance from the root. fn on_header_proofs(&self, _ctx: &EventContext, _req_id: ReqId, _proofs: &[(Bytes, Vec)]) { } - /// Called on abort. + /// Called to "tick" the handler periodically. + fn tick(&self, _ctx: &BasicContext) { } + /// Called on abort. This signals to handlers that they should clean up + /// and ignore peers. + // TODO: coreresponding `on_activate`? fn on_abort(&self) { } } @@ -253,18 +262,25 @@ impl LightProtocol { } /// Check the maximum amount of requests of a specific type - /// which a peer would be able to serve. - pub fn max_requests(&self, peer: PeerId, kind: request::Kind) -> Option { + /// which a peer would be able to serve. Returns zero if the + /// peer is unknown or has no buffer flow parameters. + fn max_requests(&self, peer: PeerId, kind: request::Kind) -> usize { self.peers.read().get(&peer).and_then(|peer| { let mut peer = peer.lock(); - match peer.remote_flow.as_mut() { - Some(&mut (ref mut buf, ref flow)) => { + let idle = peer.idle; + match peer.remote_flow { + Some((ref mut buf, ref flow)) => { flow.recharge(buf); - Some(flow.max_amount(&*buf, kind)) + + if !idle { + Some(0) + } else { + Some(flow.max_amount(&*buf, kind)) + } } None => None, } - }) + }).unwrap_or(0) } /// Make a request to a peer. @@ -278,8 +294,10 @@ impl LightProtocol { let peer = peers.get(peer_id).ok_or_else(|| Error::UnknownPeer)?; let mut peer = peer.lock(); - match peer.remote_flow.as_mut() { - Some(&mut (ref mut buf, ref flow)) => { + if !peer.idle { return Err(Error::Overburdened) } + + match peer.remote_flow { + Some((ref mut buf, ref flow)) => { flow.recharge(buf); let max = flow.compute_cost(request.kind(), request.amount()); buf.deduct_cost(max)?; @@ -290,6 +308,8 @@ impl LightProtocol { let req_id = self.req_id.fetch_add(1, Ordering::SeqCst); let packet_data = encode_request(&request, req_id); + trace!(target: "les", "Dispatching request {} to peer {}", req_id, peer_id); + let packet_id = match request.kind() { request::Kind::Headers => packet::GET_BLOCK_HEADERS, request::Kind::Bodies => packet::GET_BLOCK_BODIES, @@ -301,6 +321,7 @@ impl LightProtocol { io.send(*peer_id, packet_id, packet_data); + peer.idle = false; self.pending_requests.write().insert(req_id, Requested { request: request, timestamp: SteadyTime::now(), @@ -404,6 +425,8 @@ impl LightProtocol { match peers.get(peer) { Some(peer_info) => { let mut peer_info = peer_info.lock(); + peer_info.idle = true; + match peer_info.remote_flow.as_mut() { Some(&mut (ref mut buf, ref mut flow)) => { let actual_buffer = ::std::cmp::min(cur_buffer, *flow.limit()); @@ -505,6 +528,15 @@ impl LightProtocol { } } } + + fn tick_handlers(&self, io: &IoContext) { + for handler in &self.handlers { + handler.tick(&TickCtx { + io: io, + proto: self, + }) + } + } } impl LightProtocol { @@ -603,6 +635,7 @@ impl LightProtocol { remote_flow: remote_flow, sent_head: pending.sent_head, last_update: pending.last_update, + idle: true, })); for handler in &self.handlers { @@ -1123,7 +1156,10 @@ fn punish(peer: PeerId, io: &IoContext, e: Error) { impl NetworkProtocolHandler for LightProtocol { fn initialize(&self, io: &NetworkContext) { - io.register_timer(TIMEOUT, TIMEOUT_INTERVAL_MS).expect("Error registering sync timer."); + io.register_timer(TIMEOUT, TIMEOUT_INTERVAL_MS) + .expect("Error registering sync timer."); + io.register_timer(TICK_TIMEOUT, TICK_TIMEOUT_INTERVAL_MS) + .expect("Error registering sync timer."); } fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { @@ -1141,6 +1177,7 @@ impl NetworkProtocolHandler for LightProtocol { fn timeout(&self, io: &NetworkContext, timer: TimerToken) { match timer { TIMEOUT => self.timeout_check(io), + TICK_TIMEOUT => self.tick_handlers(io), _ => warn!(target: "les", "received timeout on unknown token {}", timer), } } diff --git a/ethcore/light/src/net/status.rs b/ethcore/light/src/net/status.rs index 533d6b389..d058bc2f2 100644 --- a/ethcore/light/src/net/status.rs +++ b/ethcore/light/src/net/status.rs @@ -562,4 +562,4 @@ mod tests { assert_eq!(read_capabilities, capabilities); assert!(read_flow.is_none()); } -} \ No newline at end of file +} diff --git a/parity/cli/config.full.toml b/parity/cli/config.full.toml index c2456dfcd..088066e63 100644 --- a/parity/cli/config.full.toml +++ b/parity/cli/config.full.toml @@ -12,6 +12,7 @@ base_path = "$HOME/.parity" db_path = "$HOME/.parity/chains" keys_path = "$HOME/.parity/keys" identity = "" +light = false [account] unlock = ["0xdeadbeefcafe0000000000000000000000000000"] diff --git a/parity/cli/usage.txt b/parity/cli/usage.txt index dc7c487ff..7c3e5cc7d 100644 --- a/parity/cli/usage.txt +++ b/parity/cli/usage.txt @@ -36,15 +36,15 @@ Operating Options: (default: {flag_mode_alarm}). --auto-update SET Set a releases set to automatically update and install. - all - All updates in the our release track. + all - All updates in the our release track. critical - Only consensus/security updates. - none - No updates will be auto-installed. + none - No updates will be auto-installed. (default: {flag_auto_update}). --release-track TRACK Set which release track we should use for updates. - stable - Stable releases. - beta - Beta releases. + stable - Stable releases. + beta - Beta releases. nightly - Nightly releases (unstable). - testing - Testing releases (do not use). + testing - Testing releases (do not use). current - Whatever track this executable was released on (default: {flag_release_track}). --no-download Normally new releases will be downloaded ready for @@ -362,7 +362,7 @@ Legacy Options: --cache MB Equivalent to --cache-size MB. Internal Options: - --can-restart Executable will auto-restart if exiting with 69. + --can-restart Executable will auto-restart if exiting with 69. Miscellaneous Options: -c --config CONFIG Specify a filename containing a configuration file. diff --git a/parity/configuration.rs b/parity/configuration.rs index 23adffe66..85ff61d5c 100644 --- a/parity/configuration.rs +++ b/parity/configuration.rs @@ -690,7 +690,7 @@ impl Configuration { "none" => UpdateFilter::None, "critical" => UpdateFilter::Critical, "all" => UpdateFilter::All, - _ => return Err("Invalid value for `--auto-update`. See `--help` for more information.".into()), + _ => return Err("Invalid value for `--auto-update`. See `--help` for more information.".into()), }, track: match self.args.flag_release_track.as_ref() { "stable" => ReleaseTrack::Stable, @@ -698,7 +698,7 @@ impl Configuration { "nightly" => ReleaseTrack::Nightly, "testing" => ReleaseTrack::Testing, "current" => ReleaseTrack::Unknown, - _ => return Err("Invalid value for `--releases-track`. See `--help` for more information.".into()), + _ => return Err("Invalid value for `--releases-track`. See `--help` for more information.".into()), }, path: default_hypervisor_path(), }) diff --git a/parity/run.rs b/parity/run.rs index 9c31561eb..a878c2aae 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -32,12 +32,10 @@ use ethcore::verification::queue::VerifierSettings; use ethsync::SyncConfig; use informant::Informant; use updater::{UpdatePolicy, Updater}; -use parity_reactor::{EventLoop, EventLoopHandle}; +use parity_reactor::EventLoop; use hash_fetch::fetch::{Fetch, Client as FetchClient}; -use rpc::{HttpServer, IpcServer, HttpConfiguration, IpcConfiguration}; -use signer::SignerServer; -use dapps::WebappServer; +use rpc::{HttpConfiguration, IpcConfiguration}; use params::{ SpecType, Pruning, AccountsConfig, GasPricerConfig, MinerExtras, Switch, tracing_switch_to_bool, fatdb_switch_to_bool, mode_switch_to_bool @@ -444,16 +442,10 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc) -> R } // Handle exit - let restart = wait_for_exit( - panic_handler, - http_server, - ipc_server, - dapps_server, - signer_server, - event_loop.into(), - updater, - can_restart, - ); + let restart = wait_for_exit(panic_handler, Some(updater), can_restart); + + // drop this stuff as soon as exit detected. + drop((http_server, ipc_server, dapps_server, signer_server, event_loop)); info!("Finishing work, please wait..."); @@ -523,12 +515,7 @@ fn build_create_account_hint(spec: &SpecType, keys: &str) -> String { fn wait_for_exit( panic_handler: Arc, - _http_server: Option, - _ipc_server: Option, - _dapps_server: Option, - _signer_server: Option, - _event_loop: EventLoopHandle, - updater: Arc, + updater: Option>, can_restart: bool ) -> bool { let exit = Arc::new((Mutex::new(false), Condvar::new())); @@ -541,12 +528,14 @@ fn wait_for_exit( let e = exit.clone(); panic_handler.on_panic(move |_reason| { e.1.notify_all(); }); - // Handle updater wanting to restart us - if can_restart { - let e = exit.clone(); - updater.set_exit_handler(move || { *e.0.lock() = true; e.1.notify_all(); }); - } else { - updater.set_exit_handler(|| info!("Update installed; ready for restart.")); + if let Some(updater) = updater { + // Handle updater wanting to restart us + if can_restart { + let e = exit.clone(); + updater.set_exit_handler(move || { *e.0.lock() = true; e.1.notify_all(); }); + } else { + updater.set_exit_handler(|| info!("Update installed; ready for restart.")); + } } // Wait for signal diff --git a/sync/src/api.rs b/sync/src/api.rs index 6237b3dc1..6c2c43db4 100644 --- a/sync/src/api.rs +++ b/sync/src/api.rs @@ -33,6 +33,8 @@ use ipc::{BinaryConvertable, BinaryConvertError, IpcConfig}; use std::str::FromStr; use parking_lot::RwLock; use chain::{ETH_PACKET_COUNT, SNAPSHOT_SYNC_PACKET_COUNT}; +use light::client::LightChainClient; +use light::Provider; use light::net::{LightProtocol, Params as LightParams, Capabilities, Handler as LightHandler, EventContext}; /// Parity sync protocol @@ -567,3 +569,101 @@ pub struct ServiceConfiguration { /// IPC path. pub io_path: String, } + +/// Configuration for the light sync. +pub struct LightSyncParams { + /// Network configuration. + pub network_config: BasicNetworkConfiguration, + /// Light client to sync to. + pub client: Arc, + /// Network ID. + pub network_id: u64, + /// Subprotocol name. + pub subprotocol_name: [u8; 3], +} + +/// Service for light synchronization. +pub struct LightSync { + proto: Arc, + network: NetworkService, + subprotocol_name: [u8; 3], +} + +impl LightSync { + /// Create a new light sync service. + pub fn new(params: LightSyncParams) -> Result + where L: LightChainClient + Provider + 'static + { + use light_sync::LightSync as SyncHandler; + + // initialize light protocol handler and attach sync module. + let light_proto = { + let light_params = LightParams { + network_id: params.network_id, + flow_params: Default::default(), // or `None`? + capabilities: Capabilities { + serve_headers: false, + serve_chain_since: None, + serve_state_since: None, + tx_relay: false, + }, + }; + + let mut light_proto = LightProtocol::new(params.client.clone(), light_params); + let sync_handler = try!(SyncHandler::new(params.client.clone())); + light_proto.add_handler(Box::new(sync_handler)); + + Arc::new(light_proto) + }; + + let service = try!(NetworkService::new(params.network_config)); + + Ok(LightSync { + proto: light_proto, + network: service, + subprotocol_name: params.subprotocol_name, + }) + } +} + +impl ManageNetwork for LightSync { + fn accept_unreserved_peers(&self) { + self.network.set_non_reserved_mode(NonReservedPeerMode::Accept); + } + + fn deny_unreserved_peers(&self) { + self.network.set_non_reserved_mode(NonReservedPeerMode::Deny); + } + + fn remove_reserved_peer(&self, peer: String) -> Result<(), String> { + self.network.remove_reserved_peer(&peer).map_err(|e| format!("{:?}", e)) + } + + fn add_reserved_peer(&self, peer: String) -> Result<(), String> { + self.network.add_reserved_peer(&peer).map_err(|e| format!("{:?}", e)) + } + + fn start_network(&self) { + match self.network.start() { + Err(NetworkError::StdIo(ref e)) if e.kind() == io::ErrorKind::AddrInUse => warn!("Network port {:?} is already in use, make sure that another instance of an Ethereum client is not running or change the port using the --port option.", self.network.config().listen_address.expect("Listen address is not set.")), + Err(err) => warn!("Error starting network: {}", err), + _ => {}, + } + + let light_proto = self.proto.clone(); + + self.network.register_protocol(light_proto, self.subprotocol_name, ::light::net::PACKET_COUNT, ::light::net::PROTOCOL_VERSIONS) + .unwrap_or_else(|e| warn!("Error registering light client protocol: {:?}", e)); + } + + fn stop_network(&self) { + self.proto.abort(); + if let Err(e) = self.network.stop() { + warn!("Error stopping network: {}", e); + } + } + + fn network_config(&self) -> NetworkConfiguration { + NetworkConfiguration::from(self.network.config().clone()) + } +} diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 0ff3bfe66..9b3cdab5e 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -58,6 +58,8 @@ mod sync_io; mod snapshot; mod transactions_stats; +pub mod light_sync; + #[cfg(test)] mod tests; @@ -70,8 +72,11 @@ mod api { #[cfg(not(feature = "ipc"))] mod api; -pub use api::{EthSync, Params, SyncProvider, ManageNetwork, SyncConfig, - ServiceConfiguration, NetworkConfiguration, PeerInfo, AllowIP, TransactionStats}; +pub use api::{ + EthSync, Params, SyncProvider, ManageNetwork, SyncConfig, + ServiceConfiguration, NetworkConfiguration, PeerInfo, AllowIP, TransactionStats, + LightSync, LightSyncParams, +}; pub use chain::{SyncStatus, SyncState}; pub use network::{is_valid_node_url, NonReservedPeerMode, NetworkError}; diff --git a/sync/src/light_sync/mod.rs b/sync/src/light_sync/mod.rs new file mode 100644 index 000000000..226b1fdff --- /dev/null +++ b/sync/src/light_sync/mod.rs @@ -0,0 +1,459 @@ +// Copyright 2015, 2016 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Light client synchronization. +//! +//! This will synchronize the header chain using LES messages. +//! Dataflow is largely one-directional as headers are pushed into +//! the light client queue for import. Where possible, they are batched +//! in groups. +//! +//! This is written assuming that the client and sync service are running +//! in the same binary; unlike a full node which might communicate via IPC. + +use std::collections::HashMap; +use std::mem; +use std::sync::Arc; + +use light::client::LightChainClient; +use light::net::{ + Announcement, Handler, BasicContext, EventContext, + Capabilities, ReqId, Status, +}; +use light::request; +use network::PeerId; +use util::{Bytes, U256, H256, Mutex, RwLock}; +use rand::{Rng, OsRng}; + +use self::sync_round::{AbortReason, SyncRound, ResponseContext}; + +mod response; +mod sync_round; + +/// Peer chain info. +#[derive(Clone)] +struct ChainInfo { + head_td: U256, + head_hash: H256, + head_num: u64, +} + +struct Peer { + status: ChainInfo, +} + +impl Peer { + // Create a new peer. + fn new(chain_info: ChainInfo) -> Self { + Peer { + status: chain_info, + } + } +} +// search for a common ancestor with the best chain. +enum AncestorSearch { + Queued(u64), // queued to search for blocks starting from here. + Awaiting(ReqId, u64, request::Headers), // awaiting response for this request. + Prehistoric, // prehistoric block found. TODO: start to roll back CHTs. + FoundCommon(u64, H256), // common block found. + Genesis, // common ancestor is the genesis. +} + +impl AncestorSearch { + fn begin(best_num: u64) -> Self { + match best_num { + 0 => AncestorSearch::Genesis, + _ => AncestorSearch::Queued(best_num), + } + } + + fn process_response(self, ctx: &ResponseContext, client: &L) -> AncestorSearch + where L: LightChainClient + { + let first_num = client.chain_info().first_block_number.unwrap_or(0); + match self { + AncestorSearch::Awaiting(id, start, req) => { + if &id == ctx.req_id() { + match response::decode_and_verify(ctx.data(), &req) { + Ok(headers) => { + for header in &headers { + if client.is_known(&header.hash()) { + debug!(target: "sync", "Found common ancestor with best chain"); + return AncestorSearch::FoundCommon(header.number(), header.hash()); + } + + if header.number() <= first_num { + debug!(target: "sync", "Prehistoric common ancestor with best chain."); + return AncestorSearch::Prehistoric; + } + } + + AncestorSearch::Queued(start - headers.len() as u64) + } + Err(e) => { + trace!(target: "sync", "Bad headers response from {}: {}", ctx.responder(), e); + + ctx.punish_responder(); + AncestorSearch::Queued(start) + } + } + } else { + AncestorSearch::Awaiting(id, start, req) + } + } + other => other, + } + } + + fn dispatch_request(self, mut dispatcher: F) -> AncestorSearch + where F: FnMut(request::Headers) -> Option + { + const BATCH_SIZE: usize = 64; + + match self { + AncestorSearch::Queued(start) => { + let req = request::Headers { + start: start.into(), + max: ::std::cmp::min(start as usize, BATCH_SIZE), + skip: 0, + reverse: true, + }; + + match dispatcher(req.clone()) { + Some(req_id) => AncestorSearch::Awaiting(req_id, start, req), + None => AncestorSearch::Queued(start), + } + } + other => other, + } + } +} + +// synchronization state machine. +enum SyncState { + // Idle (waiting for peers) + Idle, + // searching for common ancestor with best chain. + // queue should be cleared at this phase. + AncestorSearch(AncestorSearch), + // Doing sync rounds. + Rounds(SyncRound), +} + +struct ResponseCtx<'a> { + peer: PeerId, + req_id: ReqId, + ctx: &'a BasicContext, + data: &'a [Bytes], +} + +impl<'a> ResponseContext for ResponseCtx<'a> { + fn responder(&self) -> PeerId { self.peer } + fn req_id(&self) -> &ReqId { &self.req_id } + fn data(&self) -> &[Bytes] { self.data } + fn punish_responder(&self) { self.ctx.disable_peer(self.peer) } +} + +/// Light client synchronization manager. See module docs for more details. +pub struct LightSync { + best_seen: Mutex>, // best seen block on the network. + peers: RwLock>>, // peers which are relevant to synchronization. + client: Arc, + rng: Mutex, + state: Mutex, +} + +impl Handler for LightSync { + fn on_connect(&self, ctx: &EventContext, status: &Status, capabilities: &Capabilities) { + let our_best = self.client.chain_info().best_block_number; + + if !capabilities.serve_headers || status.head_num <= our_best { + trace!(target: "sync", "Disconnecting irrelevant peer: {}", ctx.peer()); + ctx.disconnect_peer(ctx.peer()); + return; + } + + let chain_info = ChainInfo { + head_td: status.head_td, + head_hash: status.head_hash, + head_num: status.head_num, + }; + + { + let mut best = self.best_seen.lock(); + if best.as_ref().map_or(true, |b| status.head_td > b.1) { + *best = Some((status.head_hash, status.head_td)); + } + } + + self.peers.write().insert(ctx.peer(), Mutex::new(Peer::new(chain_info))); + self.maintain_sync(ctx.as_basic()); + } + + fn on_disconnect(&self, ctx: &EventContext, unfulfilled: &[ReqId]) { + let peer_id = ctx.peer(); + + let peer = match self.peers.write().remove(&peer_id).map(|p| p.into_inner()) { + Some(peer) => peer, + None => return, + }; + + trace!(target: "sync", "peer {} disconnecting", peer_id); + + let new_best = { + let mut best = self.best_seen.lock(); + let peer_best = (peer.status.head_hash, peer.status.head_td); + + if best.as_ref().map_or(false, |b| b == &peer_best) { + // search for next-best block. + let next_best: Option<(H256, U256)> = self.peers.read().values() + .map(|p| p.lock()) + .map(|p| (p.status.head_hash, p.status.head_td)) + .fold(None, |acc, x| match acc { + Some(acc) => if x.1 > acc.1 { Some(x) } else { Some(acc) }, + None => Some(x), + }); + + *best = next_best; + } + + best.clone() + }; + + if new_best.is_none() { + debug!(target: "sync", "No peers remain. Reverting to idle"); + *self.state.lock() = SyncState::Idle; + } else { + let mut state = self.state.lock(); + + *state = match mem::replace(&mut *state, SyncState::Idle) { + SyncState::Idle => SyncState::Idle, + SyncState::AncestorSearch(search) => SyncState::AncestorSearch(search), + SyncState::Rounds(round) => SyncState::Rounds(round.requests_abandoned(unfulfilled)), + }; + } + + self.maintain_sync(ctx.as_basic()); + } + + fn on_announcement(&self, ctx: &EventContext, announcement: &Announcement) { + let last_td = { + let peers = self.peers.read(); + match peers.get(&ctx.peer()) { + None => return, + Some(peer) => { + let mut peer = peer.lock(); + let last_td = peer.status.head_td; + peer.status = ChainInfo { + head_td: announcement.head_td, + head_hash: announcement.head_hash, + head_num: announcement.head_num, + }; + last_td + } + } + }; + + trace!(target: "sync", "Announcement from peer {}: new chain head {:?}, reorg depth {}", + ctx.peer(), (announcement.head_hash, announcement.head_num), announcement.reorg_depth); + + if last_td > announcement.head_td { + trace!(target: "sync", "Peer {} moved backwards.", ctx.peer()); + self.peers.write().remove(&ctx.peer()); + ctx.disconnect_peer(ctx.peer()); + } + + { + let mut best = self.best_seen.lock(); + if best.as_ref().map_or(true, |b| announcement.head_td > b.1) { + *best = Some((announcement.head_hash, announcement.head_td)); + } + } + + self.maintain_sync(ctx.as_basic()); + } + + fn on_block_headers(&self, ctx: &EventContext, req_id: ReqId, headers: &[Bytes]) { + if !self.peers.read().contains_key(&ctx.peer()) { + return + } + + { + let mut state = self.state.lock(); + + let ctx = ResponseCtx { + peer: ctx.peer(), + req_id: req_id, + ctx: ctx.as_basic(), + data: headers, + }; + + *state = match mem::replace(&mut *state, SyncState::Idle) { + SyncState::Idle => SyncState::Idle, + SyncState::AncestorSearch(search) => + SyncState::AncestorSearch(search.process_response(&ctx, &*self.client)), + SyncState::Rounds(round) => SyncState::Rounds(round.process_response(&ctx)), + }; + } + + self.maintain_sync(ctx.as_basic()); + } + + fn tick(&self, ctx: &BasicContext) { + self.maintain_sync(ctx); + } +} + +// private helpers +impl LightSync { + // Begins a search for the common ancestor and our best block. + // does not lock state, instead has a mutable reference to it passed. + fn begin_search(&self, state: &mut SyncState) { + if let None = *self.best_seen.lock() { + // no peers. + *state = SyncState::Idle; + return; + } + + trace!(target: "sync", "Beginning search for common ancestor"); + self.client.clear_queue(); + let chain_info = self.client.chain_info(); + + *state = SyncState::AncestorSearch(AncestorSearch::begin(chain_info.best_block_number)); + } + + fn maintain_sync(&self, ctx: &BasicContext) { + const DRAIN_AMOUNT: usize = 128; + + debug!(target: "sync", "Maintaining sync."); + + let mut state = self.state.lock(); + + // drain any pending blocks into the queue. + { + let mut sink = Vec::with_capacity(DRAIN_AMOUNT); + + 'a: + loop { + let queue_info = self.client.queue_info(); + if queue_info.is_full() { break } + + *state = match mem::replace(&mut *state, SyncState::Idle) { + SyncState::Rounds(round) + => SyncState::Rounds(round.drain(&mut sink, Some(DRAIN_AMOUNT))), + other => other, + }; + + if sink.is_empty() { break } + + for header in sink.drain(..) { + if let Err(e) = self.client.queue_header(header) { + debug!(target: "sync", "Found bad header ({:?}). Reset to search state.", e); + + self.begin_search(&mut state); + break 'a; + } + } + } + } + + // handle state transitions. + { + match mem::replace(&mut *state, SyncState::Idle) { + SyncState::Rounds(SyncRound::Abort(reason)) => { + match reason { + AbortReason::BadScaffold(bad_peers) => { + debug!(target: "sync", "Disabling peers responsible for bad scaffold"); + for peer in bad_peers { + ctx.disable_peer(peer); + } + } + AbortReason::NoResponses => {} + } + + debug!(target: "sync", "Beginning search after aborted sync round"); + self.begin_search(&mut state); + } + SyncState::AncestorSearch(AncestorSearch::FoundCommon(num, hash)) => { + // TODO: compare to best block and switch to another downloading + // method when close. + *state = SyncState::Rounds(SyncRound::begin(num, hash)); + } + SyncState::AncestorSearch(AncestorSearch::Genesis) => { + // Same here. + let g_hash = self.client.chain_info().genesis_hash; + *state = SyncState::Rounds(SyncRound::begin(0, g_hash)); + } + SyncState::Idle => self.begin_search(&mut state), + other => *state = other, // restore displaced state. + } + } + + // allow dispatching of requests. + // TODO: maybe wait until the amount of cumulative requests remaining is high enough + // to avoid pumping the failure rate. + { + let peers = self.peers.read(); + let mut peer_ids: Vec<_> = peers.keys().cloned().collect(); + let mut rng = self.rng.lock(); + + // naive request dispatcher: just give to any peer which says it will + // give us responses. + let dispatcher = move |req: request::Headers| { + rng.shuffle(&mut peer_ids); + + for peer in &peer_ids { + if ctx.max_requests(*peer, request::Kind::Headers) >= req.max { + match ctx.request_from(*peer, request::Request::Headers(req.clone())) { + Ok(id) => { + return Some(id) + } + Err(e) => + trace!(target: "sync", "Error requesting headers from viable peer: {}", e), + } + } + } + + None + }; + + *state = match mem::replace(&mut *state, SyncState::Idle) { + SyncState::Rounds(round) => + SyncState::Rounds(round.dispatch_requests(dispatcher)), + SyncState::AncestorSearch(search) => + SyncState::AncestorSearch(search.dispatch_request(dispatcher)), + other => other, + }; + } + } +} + +// public API +impl LightSync { + /// Create a new instance of `LightSync`. + /// + /// This won't do anything until registered as a handler + /// so it can act on events. + pub fn new(client: Arc) -> Result { + Ok(LightSync { + best_seen: Mutex::new(None), + peers: RwLock::new(HashMap::new()), + client: client, + rng: Mutex::new(try!(OsRng::new())), + state: Mutex::new(SyncState::Idle), + }) + } +} diff --git a/sync/src/light_sync/response.rs b/sync/src/light_sync/response.rs new file mode 100644 index 000000000..907db763e --- /dev/null +++ b/sync/src/light_sync/response.rs @@ -0,0 +1,254 @@ +// Copyright 2015, 2016 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Helpers for decoding and verifying responses for headers. + +use std::fmt; + +use ethcore::header::Header; +use light::request::{HashOrNumber, Headers as HeadersRequest}; +use rlp::{DecoderError, UntrustedRlp, View}; +use util::{Bytes, H256}; + +/// Errors found when decoding headers and verifying with basic constraints. +#[derive(Debug, PartialEq)] +pub enum BasicError { + /// Wrong skip value: expected, found (if any). + WrongSkip(u64, Option), + /// Wrong start number. + WrongStartNumber(u64, u64), + /// Wrong start hash. + WrongStartHash(H256, H256), + /// Too many headers. + TooManyHeaders(usize, usize), + /// Decoder error. + Decoder(DecoderError), +} + +impl From for BasicError { + fn from(err: DecoderError) -> Self { + BasicError::Decoder(err) + } +} + +impl fmt::Display for BasicError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + try!(write!(f, "Header response verification error: ")); + + match *self { + BasicError::WrongSkip(ref exp, ref got) + => write!(f, "wrong skip (expected {}, got {:?})", exp, got), + BasicError::WrongStartNumber(ref exp, ref got) + => write!(f, "wrong start number (expected {}, got {})", exp, got), + BasicError::WrongStartHash(ref exp, ref got) + => write!(f, "wrong start hash (expected {}, got {})", exp, got), + BasicError::TooManyHeaders(ref max, ref got) + => write!(f, "too many headers (max {}, got {})", max, got), + BasicError::Decoder(ref err) + => write!(f, "{}", err), + } + } +} + +/// Request verification constraint. +pub trait Constraint { + type Error; + + /// Verify headers against this. + fn verify(&self, headers: &[Header], reverse: bool) -> Result<(), Self::Error>; +} + +/// Decode a response and do basic verification against a request. +pub fn decode_and_verify(headers: &[Bytes], request: &HeadersRequest) -> Result, BasicError> { + let headers: Vec<_> = try!(headers.iter().map(|x| UntrustedRlp::new(&x).as_val()).collect()); + + let reverse = request.reverse; + + try!(Max(request.max).verify(&headers, reverse)); + match request.start { + HashOrNumber::Number(ref num) => try!(StartsAtNumber(*num).verify(&headers, reverse)), + HashOrNumber::Hash(ref hash) => try!(StartsAtHash(*hash).verify(&headers, reverse)), + } + + try!(SkipsBetween(request.skip).verify(&headers, reverse)); + + Ok(headers) +} + +struct StartsAtNumber(u64); +struct StartsAtHash(H256); +struct SkipsBetween(u64); +struct Max(usize); + +impl Constraint for StartsAtNumber { + type Error = BasicError; + + fn verify(&self, headers: &[Header], _reverse: bool) -> Result<(), BasicError> { + headers.first().map_or(Ok(()), |h| { + if h.number() == self.0 { + Ok(()) + } else { + Err(BasicError::WrongStartNumber(self.0, h.number())) + } + }) + } +} + +impl Constraint for StartsAtHash { + type Error = BasicError; + + fn verify(&self, headers: &[Header], _reverse: bool) -> Result<(), BasicError> { + headers.first().map_or(Ok(()), |h| { + if h.hash() == self.0 { + Ok(()) + } else { + Err(BasicError::WrongStartHash(self.0, h.hash())) + } + }) + } +} + +impl Constraint for SkipsBetween { + type Error = BasicError; + + fn verify(&self, headers: &[Header], reverse: bool) -> Result<(), BasicError> { + for pair in headers.windows(2) { + let (low, high) = if reverse { (&pair[1], &pair[0]) } else { (&pair[0], &pair[1]) }; + if low.number() >= high.number() { return Err(BasicError::WrongSkip(self.0, None)) } + + let skip = (high.number() - low.number()) - 1; + if skip != self.0 { return Err(BasicError::WrongSkip(self.0, Some(skip))) } + } + + Ok(()) + } +} + +impl Constraint for Max { + type Error = BasicError; + + fn verify(&self, headers: &[Header], _reverse: bool) -> Result<(), BasicError> { + match headers.len() > self.0 { + true => Err(BasicError::TooManyHeaders(self.0, headers.len())), + false => Ok(()) + } + } +} + +#[cfg(test)] +mod tests { + use ethcore::header::Header; + use light::request::Headers as HeadersRequest; + + use super::*; + + #[test] + fn sequential_forward() { + let request = HeadersRequest { + start: 10.into(), + max: 30, + skip: 0, + reverse: false, + }; + + let mut parent_hash = None; + let headers: Vec<_> = (0..25).map(|x| x + 10).map(|x| { + let mut header = Header::default(); + header.set_number(x); + + if let Some(parent_hash) = parent_hash { + header.set_parent_hash(parent_hash); + } + + parent_hash = Some(header.hash()); + + ::rlp::encode(&header).to_vec() + }).collect(); + + assert!(decode_and_verify(&headers, &request).is_ok()); + } + + #[test] + fn sequential_backward() { + let request = HeadersRequest { + start: 34.into(), + max: 30, + skip: 0, + reverse: true, + }; + + let mut parent_hash = None; + let headers: Vec<_> = (0..25).map(|x| x + 10).rev().map(|x| { + let mut header = Header::default(); + header.set_number(x); + + if let Some(parent_hash) = parent_hash { + header.set_parent_hash(parent_hash); + } + + parent_hash = Some(header.hash()); + + ::rlp::encode(&header).to_vec() + }).collect(); + + assert!(decode_and_verify(&headers, &request).is_ok()); + } + + #[test] + fn too_many() { + let request = HeadersRequest { + start: 10.into(), + max: 20, + skip: 0, + reverse: false, + }; + + let mut parent_hash = None; + let headers: Vec<_> = (0..25).map(|x| x + 10).map(|x| { + let mut header = Header::default(); + header.set_number(x); + + if let Some(parent_hash) = parent_hash { + header.set_parent_hash(parent_hash); + } + + parent_hash = Some(header.hash()); + + ::rlp::encode(&header).to_vec() + }).collect(); + + assert_eq!(decode_and_verify(&headers, &request), Err(BasicError::TooManyHeaders(20, 25))); + } + + #[test] + fn wrong_skip() { + let request = HeadersRequest { + start: 10.into(), + max: 30, + skip: 5, + reverse: false, + }; + + let headers: Vec<_> = (0..25).map(|x| x * 3).map(|x| x + 10).map(|x| { + let mut header = Header::default(); + header.set_number(x); + + ::rlp::encode(&header).to_vec() + }).collect(); + + assert_eq!(decode_and_verify(&headers, &request), Err(BasicError::WrongSkip(5, Some(2)))); + } +} diff --git a/sync/src/light_sync/sync_round.rs b/sync/src/light_sync/sync_round.rs new file mode 100644 index 000000000..dc1927aae --- /dev/null +++ b/sync/src/light_sync/sync_round.rs @@ -0,0 +1,456 @@ +// Copyright 2015, 2016 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Header download state machine. + +use std::cmp::Ordering; +use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque}; + +use ethcore::header::Header; + +use light::net::ReqId; +use light::request::Headers as HeadersRequest; + +use network::PeerId; +use util::{Bytes, H256}; + +use super::response; + +// amount of blocks between each scaffold entry. +// TODO: move these into parameters for `RoundStart::new`? +const ROUND_SKIP: u64 = 255; + +// amount of scaffold frames: these are the blank spaces in "X___X___X" +const ROUND_FRAMES: usize = 255; + +// number of attempts to make to get a full scaffold for a sync round. +const SCAFFOLD_ATTEMPTS: usize = 3; + +/// Context for a headers response. +pub trait ResponseContext { + /// Get the peer who sent this response. + fn responder(&self) -> PeerId; + /// Get the request ID this response corresponds to. + fn req_id(&self) -> &ReqId; + /// Get the (unverified) response data. + fn data(&self) -> &[Bytes]; + /// Punish the responder. + fn punish_responder(&self); +} + +/// Reasons for sync round abort. +#[derive(Debug, Clone)] +pub enum AbortReason { + /// Bad sparse header chain along with a list of peers who contributed to it. + BadScaffold(Vec), + /// No incoming data. + NoResponses, +} + +// A request for headers with a known starting header hash. +// and a known parent hash for the last block. +#[derive(PartialEq, Eq)] +struct SubchainRequest { + subchain_parent: (u64, H256), + headers_request: HeadersRequest, + subchain_end: (u64, H256), + downloaded: VecDeque
, +} + +// ordered by subchain parent number so pending requests towards the +// front of the round are dispatched first. +impl PartialOrd for SubchainRequest { + fn partial_cmp(&self, other: &Self) -> Option { + self.subchain_parent.0 + .partial_cmp(&other.subchain_parent.0) + .map(Ordering::reverse) + } +} + +impl Ord for SubchainRequest { + fn cmp(&self, other: &Self) -> Ordering { + self.subchain_parent.0.cmp(&other.subchain_parent.0).reverse() + } +} + +/// Manages downloading of interior blocks of a sparse header chain. +pub struct Fetcher { + sparse: VecDeque
, // sparse header chain. + requests: BinaryHeap, + complete_requests: HashMap, + pending: HashMap, + scaffold_contributors: Vec, + ready: VecDeque
, + end: (u64, H256), +} + +impl Fetcher { + // Produce a new fetcher given a sparse headerchain, in ascending order along + // with a list of peers who helped produce the chain. + // The headers must be valid RLP at this point and must have a consistent + // non-zero gap between them. Will abort the round if found wrong. + fn new(sparse_headers: Vec
, contributors: Vec) -> SyncRound { + let mut requests = BinaryHeap::with_capacity(sparse_headers.len() - 1); + + for pair in sparse_headers.windows(2) { + let low_rung = &pair[0]; + let high_rung = &pair[1]; + + let diff = high_rung.number() - low_rung.number(); + + // should never happen as long as we verify the gaps + // gotten from SyncRound::Start + if diff < 2 { continue } + + let needed_headers = HeadersRequest { + start: high_rung.parent_hash().clone().into(), + max: diff as usize - 1, + skip: 0, + reverse: true, + }; + + requests.push(SubchainRequest { + headers_request: needed_headers, + subchain_end: (high_rung.number() - 1, *high_rung.parent_hash()), + downloaded: VecDeque::new(), + subchain_parent: (low_rung.number(), low_rung.hash()), + }); + } + + let end = match sparse_headers.last().map(|h| (h.number(), h.hash())) { + Some(end) => end, + None => return SyncRound::abort(AbortReason::BadScaffold(contributors)), + }; + + SyncRound::Fetch(Fetcher { + sparse: sparse_headers.into(), + requests: requests, + complete_requests: HashMap::new(), + pending: HashMap::new(), + scaffold_contributors: contributors, + ready: VecDeque::new(), + end: end, + }) + } + + // collect complete requests and their subchain from the sparse header chain + // into the ready set in order. + fn collect_ready(&mut self) { + loop { + let start_hash = match self.sparse.front() { + Some(first) => first.hash(), + None => break, + }; + + match self.complete_requests.remove(&start_hash) { + None => break, + Some(complete_req) => { + self.ready.push_back(self.sparse.pop_front().expect("first known to exist; qed")); + self.ready.extend(complete_req.downloaded); + } + } + } + + // frames are between two sparse headers and keyed by subchain parent, so the last + // remaining will be the last header. + if self.sparse.len() == 1 { + self.ready.push_back(self.sparse.pop_back().expect("sparse known to have one entry; qed")) + } + } + + fn process_response(mut self, ctx: &R) -> SyncRound { + let mut request = match self.pending.remove(ctx.req_id()) { + Some(request) => request, + None => return SyncRound::Fetch(self), + }; + + let headers = ctx.data(); + + if headers.len() == 0 { + trace!(target: "sync", "Punishing peer {} for empty response", ctx.responder()); + ctx.punish_responder(); + return SyncRound::Fetch(self); + } + + match response::decode_and_verify(headers, &request.headers_request) { + Err(e) => { + trace!(target: "sync", "Punishing peer {} for invalid response ({})", ctx.responder(), e); + ctx.punish_responder(); + + // TODO: track number of attempts per request, + // abort if failure rate too high. + self.requests.push(request); + SyncRound::Fetch(self) + } + Ok(headers) => { + let mut parent_hash = None; + for header in headers { + if parent_hash.as_ref().map_or(false, |h| h != &header.hash()) { + trace!(target: "sync", "Punishing peer {} for parent mismatch", ctx.responder()); + ctx.punish_responder(); + + self.requests.push(request); + return SyncRound::Fetch(self); + } + + // incrementally update the frame request as we go so we can + // return at any time in the loop. + parent_hash = Some(header.parent_hash().clone()); + request.headers_request.start = header.parent_hash().clone().into(); + request.headers_request.max -= 1; + + request.downloaded.push_front(header); + } + + let subchain_parent = request.subchain_parent.1; + + if request.headers_request.max == 0 { + if parent_hash.map_or(true, |hash| hash != subchain_parent) { + let abort = AbortReason::BadScaffold(self.scaffold_contributors); + return SyncRound::Abort(abort); + } + + self.complete_requests.insert(subchain_parent, request); + self.collect_ready(); + } + + // state transition not triggered until drain is finished. + (SyncRound::Fetch(self)) + } + } + } + + fn requests_abandoned(mut self, abandoned: &[ReqId]) -> SyncRound { + for abandoned in abandoned { + match self.pending.remove(abandoned) { + None => {}, + Some(req) => self.requests.push(req), + } + } + + // TODO: track failure rate and potentially abort. + SyncRound::Fetch(self) + } + + fn dispatch_requests(mut self, mut dispatcher: D) -> SyncRound + where D: FnMut(HeadersRequest) -> Option + { + while let Some(pending_req) = self.requests.pop() { + match dispatcher(pending_req.headers_request.clone()) { + Some(req_id) => { + trace!(target: "sync", "Assigned request for subchain ({} -> {})", + pending_req.subchain_parent.0 + 1, pending_req.subchain_end.0); + + self.pending.insert(req_id, pending_req); + } + None => { + self.requests.push(pending_req); + break; + } + } + } + + SyncRound::Fetch(self) + } + + fn drain(mut self, headers: &mut Vec
, max: Option) -> SyncRound { + let max = ::std::cmp::min(max.unwrap_or(usize::max_value()), self.ready.len()); + headers.extend(self.ready.drain(0..max)); + + if self.sparse.is_empty() && self.ready.is_empty() { + SyncRound::Start(RoundStart::new(self.end)) + } else { + SyncRound::Fetch(self) + } + } +} + +/// Round started: get stepped header chain. +/// from a start block with number X we request 256 headers stepped by 256 from +/// block X + 1. +pub struct RoundStart { + start_block: (u64, H256), + pending_req: Option<(ReqId, HeadersRequest)>, + sparse_headers: Vec
, + contributors: HashSet, + attempt: usize, +} + +impl RoundStart { + fn new(start: (u64, H256)) -> Self { + RoundStart { + start_block: start.clone(), + pending_req: None, + sparse_headers: Vec::new(), + contributors: HashSet::new(), + attempt: 0, + } + } + + // called on failed attempt. may trigger a transition after a number of attempts. + // a failed attempt is defined as any time a peer returns invalid or incomplete response + fn failed_attempt(mut self) -> SyncRound { + self.attempt += 1; + + if self.attempt >= SCAFFOLD_ATTEMPTS { + if self.sparse_headers.len() > 1 { + Fetcher::new(self.sparse_headers, self.contributors.into_iter().collect()) + } else { + SyncRound::Abort(AbortReason::NoResponses) + } + } else { + SyncRound::Start(self) + } + } + + fn process_response(mut self, ctx: &R) -> SyncRound { + let req = match self.pending_req.take() { + Some((id, ref req)) if ctx.req_id() == &id => { req.clone() } + other => { + self.pending_req = other; + return SyncRound::Start(self); + } + }; + + match response::decode_and_verify(ctx.data(), &req) { + Ok(headers) => { + if self.sparse_headers.len() == 0 + && headers.get(0).map_or(false, |x| x.parent_hash() != &self.start_block.1) { + trace!(target: "sync", "Wrong parent for first header in round"); + ctx.punish_responder(); // or should we reset? + } + + self.contributors.insert(ctx.responder()); + self.sparse_headers.extend(headers); + + if self.sparse_headers.len() == ROUND_FRAMES + 1 { + trace!(target: "sync", "Beginning fetch of blocks between {} sparse headers", + self.sparse_headers.len()); + + return Fetcher::new(self.sparse_headers, self.contributors.into_iter().collect()); + } + } + Err(e) => { + trace!(target: "sync", "Punishing peer {} for malformed response ({})", ctx.responder(), e); + ctx.punish_responder(); + } + }; + + self.failed_attempt() + } + + fn requests_abandoned(mut self, abandoned: &[ReqId]) -> SyncRound { + match self.pending_req.take() { + Some((id, req)) => { + if abandoned.iter().any(|r| r == &id) { + self.pending_req = None; + self.failed_attempt() + } else { + self.pending_req = Some((id, req)); + SyncRound::Start(self) + } + } + None => SyncRound::Start(self), + } + } + + fn dispatch_requests(mut self, mut dispatcher: D) -> SyncRound + where D: FnMut(HeadersRequest) -> Option + { + if self.pending_req.is_none() { + // beginning offset + first block expected after last header we have. + let start = (self.start_block.0 + 1) + + self.sparse_headers.len() as u64 * (ROUND_SKIP + 1); + + let headers_request = HeadersRequest { + start: start.into(), + max: (ROUND_FRAMES - 1) - self.sparse_headers.len(), + skip: ROUND_SKIP, + reverse: false, + }; + + if let Some(req_id) = dispatcher(headers_request.clone()) { + self.pending_req = Some((req_id, headers_request)); + } + } + + SyncRound::Start(self) + } +} + +/// Sync round state machine. +pub enum SyncRound { + /// Beginning a sync round. + Start(RoundStart), + /// Fetching intermediate blocks during a sync round. + Fetch(Fetcher), + /// Aborted. + Abort(AbortReason), +} + +impl SyncRound { + fn abort(reason: AbortReason) -> Self { + trace!(target: "sync", "Aborting sync round: {:?}", reason); + + SyncRound::Abort(reason) + } + + /// Begin sync rounds from a starting block. + pub fn begin(num: u64, hash: H256) -> Self { + SyncRound::Start(RoundStart::new((num, hash))) + } + + /// Process an answer to a request. Unknown requests will be ignored. + pub fn process_response(self, ctx: &R) -> Self { + match self { + SyncRound::Start(round_start) => round_start.process_response(ctx), + SyncRound::Fetch(fetcher) => fetcher.process_response(ctx), + other => other, + } + } + + /// Return unfulfilled requests from disconnected peer. Unknown requests will be ignored. + pub fn requests_abandoned(self, abandoned: &[ReqId]) -> Self { + match self { + SyncRound::Start(round_start) => round_start.requests_abandoned(abandoned), + SyncRound::Fetch(fetcher) => fetcher.requests_abandoned(abandoned), + other => other, + } + } + + /// Dispatch pending requests. The dispatcher provided will attempt to + /// find a suitable peer to serve the request. + // TODO: have dispatcher take capabilities argument? and return an error as + // to why no suitable peer can be found? (no buffer, no chain heads that high, etc) + pub fn dispatch_requests(self, dispatcher: D) -> Self + where D: FnMut(HeadersRequest) -> Option + { + match self { + SyncRound::Start(round_start) => round_start.dispatch_requests(dispatcher), + SyncRound::Fetch(fetcher) => fetcher.dispatch_requests(dispatcher), + other => other, + } + } + + /// Drain up to a maximum number (None -> all) of headers (continuous, starting with a child of + /// the round start block) from the round, starting a new one once finished. + pub fn drain(self, v: &mut Vec
, max: Option) -> Self { + match self { + SyncRound::Fetch(fetcher) => fetcher.drain(v, max), + other => other, + } + } +}