diff --git a/ethcore/light/src/client/mod.rs b/ethcore/light/src/client/mod.rs index 1d32361a7..7d4cb0fe8 100644 --- a/ethcore/light/src/client/mod.rs +++ b/ethcore/light/src/client/mod.rs @@ -66,6 +66,9 @@ pub trait LightChainClient: Send + Sync { /// Clear the queue. fn clear_queue(&self); + /// Flush the queue. + fn flush_queue(&self); + /// Get queue info. fn queue_info(&self) -> queue::QueueInfo; @@ -130,7 +133,7 @@ impl Client { BlockChainInfo { total_difficulty: best_block.total_difficulty, - pending_total_difficulty: best_block.total_difficulty, + pending_total_difficulty: best_block.total_difficulty + self.queue.total_difficulty(), genesis_hash: genesis_hash, best_block_hash: best_block.hash, best_block_number: best_block.number, @@ -151,6 +154,11 @@ impl Client { self.chain.get_header(id) } + /// Flush the header queue. + pub fn flush_queue(&self) { + self.queue.flush() + } + /// Get the `i`th CHT root. pub fn cht_root(&self, i: usize) -> Option { self.chain.cht_root(i) @@ -211,6 +219,10 @@ impl LightChainClient for Client { self.queue.clear() } + fn flush_queue(&self) { + Client::flush_queue(self); + } + fn queue_info(&self) -> queue::QueueInfo { self.queue.queue_info() } diff --git a/ethcore/light/src/net/buffer_flow.rs b/ethcore/light/src/net/buffer_flow.rs index 89ba2e6e8..04cca9969 100644 --- a/ethcore/light/src/net/buffer_flow.rs +++ b/ethcore/light/src/net/buffer_flow.rs @@ -23,7 +23,7 @@ //! This module provides an interface for configuration of buffer //! flow costs and recharge rates. //! -//! Current default costs are picked completely arbitrarily, not based +//! Current default costs are picked completely arbitrarily, not based //! on any empirical timings or mathematical models. use request; @@ -184,6 +184,23 @@ impl FlowParams { } } + /// Create effectively infinite flow params. + pub fn free() -> Self { + let free_cost = Cost(0.into(), 0.into()); + FlowParams { + limit: (!0u64).into(), + recharge: 1.into(), + costs: CostTable { + headers: free_cost.clone(), + bodies: free_cost.clone(), + receipts: free_cost.clone(), + state_proofs: free_cost.clone(), + contract_codes: free_cost.clone(), + header_proofs: free_cost.clone(), + } + } + } + /// Get a reference to the buffer limit. pub fn limit(&self) -> &U256 { &self.limit } @@ -209,7 +226,7 @@ impl FlowParams { cost.0 + (amount * cost.1) } - /// Compute the maximum number of costs of a specific kind which can be made + /// Compute the maximum number of costs of a specific kind which can be made /// with the given buffer. /// Saturates at `usize::max()`. This is not a problem in practice because /// this amount of requests is already prohibitively large. @@ -317,4 +334,4 @@ mod tests { assert_eq!(buffer.estimate, 100.into()); } -} \ No newline at end of file +} diff --git a/ethcore/light/src/net/mod.rs b/ethcore/light/src/net/mod.rs index 20073c0af..384d75275 100644 --- a/ethcore/light/src/net/mod.rs +++ b/ethcore/light/src/net/mod.rs @@ -40,7 +40,6 @@ use self::buffer_flow::{Buffer, FlowParams}; use self::context::{Ctx, TickCtx}; use self::error::Punishment; -mod buffer_flow; mod context; mod error; mod status; @@ -48,6 +47,8 @@ mod status; #[cfg(test)] mod tests; +pub mod buffer_flow; + pub use self::error::Error; pub use self::context::{BasicContext, EventContext, IoContext}; pub use self::status::{Status, Capabilities, Announcement}; @@ -237,7 +238,7 @@ pub struct LightProtocol { pending_requests: RwLock>, capabilities: RwLock, flow_params: FlowParams, // assumed static and same for every peer. - handlers: Vec>, + handlers: Vec>, req_id: AtomicUsize, } @@ -376,11 +377,11 @@ impl LightProtocol { } /// Add an event handler. - /// Ownership will be transferred to the protocol structure, - /// and the handler will be kept alive as long as it is. + /// /// These are intended to be added when the protocol structure - /// is initialized as a means of customizing its behavior. - pub fn add_handler(&mut self, handler: Box) { + /// is initialized as a means of customizing its behavior, + /// and dispatching requests immediately upon events. + pub fn add_handler(&mut self, handler: Arc) { self.handlers.push(handler); } @@ -440,8 +441,10 @@ impl LightProtocol { } } - // handle a packet using the given io context. - fn handle_packet(&self, io: &IoContext, peer: &PeerId, packet_id: u8, data: &[u8]) { + /// Handle an LES packet using the given io context. + /// Packet data is _untrusted_, which means that invalid data won't lead to + /// issues. + pub fn handle_packet(&self, io: &IoContext, peer: &PeerId, packet_id: u8, data: &[u8]) { let rlp = UntrustedRlp::new(data); trace!(target: "les", "Incoming packet {} from peer {}", packet_id, peer); @@ -481,6 +484,71 @@ impl LightProtocol { } } + /// called when a peer connects. + pub fn on_connect(&self, peer: &PeerId, io: &IoContext) { + let proto_version = match io.protocol_version(*peer).ok_or(Error::WrongNetwork) { + Ok(pv) => pv, + Err(e) => { punish(*peer, io, e); return } + }; + + if PROTOCOL_VERSIONS.iter().find(|x| **x == proto_version).is_none() { + punish(*peer, io, Error::UnsupportedProtocolVersion(proto_version)); + return; + } + + let chain_info = self.provider.chain_info(); + + let status = Status { + head_td: chain_info.total_difficulty, + head_hash: chain_info.best_block_hash, + head_num: chain_info.best_block_number, + genesis_hash: chain_info.genesis_hash, + protocol_version: proto_version as u32, // match peer proto version + network_id: self.network_id, + last_head: None, + }; + + let capabilities = self.capabilities.read().clone(); + let status_packet = status::write_handshake(&status, &capabilities, Some(&self.flow_params)); + + self.pending_peers.write().insert(*peer, PendingPeer { + sent_head: chain_info.best_block_hash, + last_update: SteadyTime::now(), + }); + + io.send(*peer, packet::STATUS, status_packet); + } + + /// called when a peer disconnects. + pub fn on_disconnect(&self, peer: PeerId, io: &IoContext) { + trace!(target: "les", "Peer {} disconnecting", peer); + + + self.pending_peers.write().remove(&peer); + if self.peers.write().remove(&peer).is_some() { + let unfulfilled: Vec<_> = self.pending_requests.read() + .iter() + .filter(|&(_, r)| r.peer_id == peer) + .map(|(&id, _)| ReqId(id)) + .collect(); + + { + let mut pending = self.pending_requests.write(); + for &ReqId(ref inner) in &unfulfilled { + pending.remove(inner); + } + } + + for handler in &self.handlers { + handler.on_disconnect(&Ctx { + peer: peer, + io: io, + proto: self, + }, &unfulfilled) + } + } + } + // check timeouts and punish peers. fn timeout_check(&self, io: &IoContext) { let now = SteadyTime::now(); @@ -529,6 +597,16 @@ impl LightProtocol { } } + /// Execute the given closure with a basic context derived from the I/O context. + pub fn with_context(&self, io: &IoContext, f: F) -> T + where F: FnOnce(&BasicContext) -> T + { + f(&TickCtx { + io: io, + proto: self, + }) + } + fn tick_handlers(&self, io: &IoContext) { for handler in &self.handlers { handler.tick(&TickCtx { @@ -540,71 +618,6 @@ impl LightProtocol { } impl LightProtocol { - // called when a peer connects. - fn on_connect(&self, peer: &PeerId, io: &IoContext) { - let proto_version = match io.protocol_version(*peer).ok_or(Error::WrongNetwork) { - Ok(pv) => pv, - Err(e) => { punish(*peer, io, e); return } - }; - - if PROTOCOL_VERSIONS.iter().find(|x| **x == proto_version).is_none() { - punish(*peer, io, Error::UnsupportedProtocolVersion(proto_version)); - return; - } - - let chain_info = self.provider.chain_info(); - - let status = Status { - head_td: chain_info.total_difficulty, - head_hash: chain_info.best_block_hash, - head_num: chain_info.best_block_number, - genesis_hash: chain_info.genesis_hash, - protocol_version: proto_version as u32, // match peer proto version - network_id: self.network_id, - last_head: None, - }; - - let capabilities = self.capabilities.read().clone(); - let status_packet = status::write_handshake(&status, &capabilities, Some(&self.flow_params)); - - self.pending_peers.write().insert(*peer, PendingPeer { - sent_head: chain_info.best_block_hash, - last_update: SteadyTime::now(), - }); - - io.send(*peer, packet::STATUS, status_packet); - } - - // called when a peer disconnects. - fn on_disconnect(&self, peer: PeerId, io: &IoContext) { - trace!(target: "les", "Peer {} disconnecting", peer); - - - self.pending_peers.write().remove(&peer); - if self.peers.write().remove(&peer).is_some() { - let unfulfilled: Vec<_> = self.pending_requests.read() - .iter() - .filter(|&(_, r)| r.peer_id == peer) - .map(|(&id, _)| ReqId(id)) - .collect(); - - { - let mut pending = self.pending_requests.write(); - for &ReqId(ref inner) in &unfulfilled { - pending.remove(inner); - } - } - - for handler in &self.handlers { - handler.on_disconnect(&Ctx { - peer: peer, - io: io, - proto: self, - }, &unfulfilled) - } - } - } - // Handle status message from peer. fn status(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> { let pending = match self.pending_peers.write().remove(peer) { diff --git a/ethcore/light/src/net/tests/mod.rs b/ethcore/light/src/net/tests/mod.rs index 56ff32b54..a0a9feee4 100644 --- a/ethcore/light/src/net/tests/mod.rs +++ b/ethcore/light/src/net/tests/mod.rs @@ -18,7 +18,7 @@ //! These don't test of the higher level logic on top of use ethcore::blockchain_info::BlockChainInfo; -use ethcore::client::{BlockChainClient, EachBlockWith, TestBlockChainClient}; +use ethcore::client::{EachBlockWith, TestBlockChainClient}; use ethcore::ids::BlockId; use ethcore::transaction::PendingTransaction; use ethcore::encoded; @@ -88,7 +88,7 @@ impl Provider for TestProvider { } fn reorg_depth(&self, a: &H256, b: &H256) -> Option { - self.0.client.tree_route(a, b).map(|route| route.index as u64) + self.0.client.reorg_depth(a, b) } fn earliest_state(&self) -> Option { @@ -305,7 +305,9 @@ fn get_block_bodies() { } let request = request::Bodies { - block_hashes: (0..10).map(|i| provider.client.block_hash(BlockId::Number(i)).unwrap()).collect(), + block_hashes: (0..10).map(|i| + provider.client.block_header(BlockId::Number(i)).unwrap().hash() + ).collect() }; let req_id = 111; @@ -353,8 +355,9 @@ fn get_block_receipts() { // find the first 10 block hashes starting with `f` because receipts are only provided // by the test client in that case. - let block_hashes: Vec<_> = (0..1000).map(|i| provider.client.block_hash(BlockId::Number(i)).unwrap()) - .filter(|hash| format!("{}", hash).starts_with("f")).take(10).collect(); + let block_hashes: Vec<_> = (0..1000).map(|i| + provider.client.block_header(BlockId::Number(i)).unwrap().hash() + ).filter(|hash| format!("{}", hash).starts_with("f")).take(10).collect(); let request = request::Receipts { block_hashes: block_hashes.clone(), diff --git a/ethcore/light/src/provider.rs b/ethcore/light/src/provider.rs index 0b94077ab..9a05d3499 100644 --- a/ethcore/light/src/provider.rs +++ b/ethcore/light/src/provider.rs @@ -83,7 +83,7 @@ pub trait Provider: Send + Sync { (0u64..req.max as u64) .map(|x: u64| x.saturating_mul(req.skip + 1)) - .take_while(|x| if req.reverse { x < &start_num } else { best_num - start_num >= *x }) + .take_while(|x| if req.reverse { x < &start_num } else { best_num.saturating_sub(start_num) >= *x }) .map(|x| if req.reverse { start_num - x } else { start_num + x }) .map(|x| self.block_header(BlockId::Number(x))) .take_while(|x| x.is_some()) diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index 22e61ab09..7568f86be 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -26,6 +26,7 @@ use blockchain::TreeRoute; use client::{ BlockChainClient, MiningBlockChainClient, EngineClient, BlockChainInfo, BlockStatus, BlockId, TransactionId, UncleId, TraceId, TraceFilter, LastHashes, CallAnalytics, BlockImportError, + ProvingBlockChainClient, }; use db::{NUM_COLUMNS, COL_STATE}; use header::{Header as BlockHeader, BlockNumber}; @@ -134,6 +135,9 @@ impl TestBlockChainClient { /// Create test client with custom spec and extra data. pub fn new_with_spec_and_extra(spec: Spec, extra_data: Bytes) -> Self { + let genesis_block = spec.genesis_block(); + let genesis_hash = spec.genesis_header().hash(); + let mut client = TestBlockChainClient { blocks: RwLock::new(HashMap::new()), numbers: RwLock::new(HashMap::new()), @@ -158,8 +162,12 @@ impl TestBlockChainClient { traces: RwLock::new(None), history: RwLock::new(None), }; - client.add_blocks(1, EachBlockWith::Nothing); // add genesis block - client.genesis_hash = client.last_hash.read().clone(); + + // insert genesis hash. + client.blocks.get_mut().insert(genesis_hash, genesis_block); + client.numbers.get_mut().insert(0, genesis_hash); + *client.last_hash.get_mut() = genesis_hash; + client.genesis_hash = genesis_hash; client } @@ -720,6 +728,20 @@ impl BlockChainClient for TestBlockChainClient { fn registry_address(&self, _name: String) -> Option
{ None } } +impl ProvingBlockChainClient for TestBlockChainClient { + fn prove_storage(&self, _: H256, _: H256, _: u32, _: BlockId) -> Vec { + Vec::new() + } + + fn prove_account(&self, _: H256, _: u32, _: BlockId) -> Vec { + Vec::new() + } + + fn code_by_hash(&self, _: H256, _: BlockId) -> Bytes { + Vec::new() + } +} + impl EngineClient for TestBlockChainClient { fn update_sealing(&self) { self.miner.update_sealing(self) diff --git a/sync/src/api.rs b/sync/src/api.rs index 36f4a0d9a..1aa8213bd 100644 --- a/sync/src/api.rs +++ b/sync/src/api.rs @@ -179,7 +179,7 @@ impl EthSync { }; let mut light_proto = LightProtocol::new(params.provider, light_params); - light_proto.add_handler(Box::new(TxRelay(params.chain.clone()))); + light_proto.add_handler(Arc::new(TxRelay(params.chain.clone()))); Arc::new(light_proto) }) @@ -612,7 +612,7 @@ impl LightSync { let mut light_proto = LightProtocol::new(params.client.clone(), light_params); let sync_handler = try!(SyncHandler::new(params.client.clone())); - light_proto.add_handler(Box::new(sync_handler)); + light_proto.add_handler(Arc::new(sync_handler)); Arc::new(light_proto) }; diff --git a/sync/src/light_sync/mod.rs b/sync/src/light_sync/mod.rs index 226b1fdff..de7d7b05a 100644 --- a/sync/src/light_sync/mod.rs +++ b/sync/src/light_sync/mod.rs @@ -23,6 +23,14 @@ //! //! This is written assuming that the client and sync service are running //! in the same binary; unlike a full node which might communicate via IPC. +//! +//! +//! Sync strategy: +//! - Find a common ancestor with peers. +//! - Split the chain up into subchains, which are downloaded in parallel from various peers in rounds. +//! - When within a certain distance of the head of the chain, aggressively download all +//! announced blocks. +//! - On bad block/response, punish peer and reset. use std::collections::HashMap; use std::mem; @@ -43,6 +51,9 @@ use self::sync_round::{AbortReason, SyncRound, ResponseContext}; mod response; mod sync_round; +#[cfg(test)] +mod tests; + /// Peer chain info. #[derive(Clone)] struct ChainInfo { @@ -64,6 +75,7 @@ impl Peer { } } // search for a common ancestor with the best chain. +#[derive(Debug)] enum AncestorSearch { Queued(u64), // queued to search for blocks starting from here. Awaiting(ReqId, u64, request::Headers), // awaiting response for this request. @@ -125,6 +137,9 @@ impl AncestorSearch { match self { AncestorSearch::Queued(start) => { + trace!(target: "sync", "Requesting {} reverse headers from {} to find common ancestor", + BATCH_SIZE, start); + let req = request::Headers { start: start.into(), max: ::std::cmp::min(start as usize, BATCH_SIZE), @@ -143,8 +158,9 @@ impl AncestorSearch { } // synchronization state machine. +#[derive(Debug)] enum SyncState { - // Idle (waiting for peers) + // Idle (waiting for peers) or at chain head. Idle, // searching for common ancestor with best chain. // queue should be cleared at this phase. @@ -328,19 +344,19 @@ impl LightSync { return; } - trace!(target: "sync", "Beginning search for common ancestor"); - self.client.clear_queue(); + self.client.flush_queue(); let chain_info = self.client.chain_info(); + trace!(target: "sync", "Beginning search for common ancestor from {:?}", + (chain_info.best_block_number, chain_info.best_block_hash)); *state = SyncState::AncestorSearch(AncestorSearch::begin(chain_info.best_block_number)); } fn maintain_sync(&self, ctx: &BasicContext) { const DRAIN_AMOUNT: usize = 128; - debug!(target: "sync", "Maintaining sync."); - let mut state = self.state.lock(); + debug!(target: "sync", "Maintaining sync ({:?})", &*state); // drain any pending blocks into the queue. { @@ -358,6 +374,7 @@ impl LightSync { }; if sink.is_empty() { break } + trace!(target: "sync", "Drained {} headers to import", sink.len()); for header in sink.drain(..) { if let Err(e) = self.client.queue_header(header) { @@ -372,8 +389,12 @@ impl LightSync { // handle state transitions. { + let chain_info = self.client.chain_info(); + let best_td = chain_info.total_difficulty; match mem::replace(&mut *state, SyncState::Idle) { - SyncState::Rounds(SyncRound::Abort(reason)) => { + _ if self.best_seen.lock().as_ref().map_or(true, |&(_, td)| best_td >= td) + => *state = SyncState::Idle, + SyncState::Rounds(SyncRound::Abort(reason, _)) => { match reason { AbortReason::BadScaffold(bad_peers) => { debug!(target: "sync", "Disabling peers responsible for bad scaffold"); @@ -394,7 +415,7 @@ impl LightSync { } SyncState::AncestorSearch(AncestorSearch::Genesis) => { // Same here. - let g_hash = self.client.chain_info().genesis_hash; + let g_hash = chain_info.genesis_hash; *state = SyncState::Rounds(SyncRound::begin(0, g_hash)); } SyncState::Idle => self.begin_search(&mut state), diff --git a/sync/src/light_sync/sync_round.rs b/sync/src/light_sync/sync_round.rs index dc1927aae..29d93daa8 100644 --- a/sync/src/light_sync/sync_round.rs +++ b/sync/src/light_sync/sync_round.rs @@ -18,6 +18,7 @@ use std::cmp::Ordering; use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque}; +use std::fmt; use ethcore::header::Header; @@ -29,9 +30,9 @@ use util::{Bytes, H256}; use super::response; -// amount of blocks between each scaffold entry. +/// amount of blocks between each scaffold entry. // TODO: move these into parameters for `RoundStart::new`? -const ROUND_SKIP: u64 = 255; +pub const ROUND_SKIP: u64 = 255; // amount of scaffold frames: these are the blank spaces in "X___X___X" const ROUND_FRAMES: usize = 255; @@ -132,7 +133,7 @@ impl Fetcher { let end = match sparse_headers.last().map(|h| (h.number(), h.hash())) { Some(end) => end, - None => return SyncRound::abort(AbortReason::BadScaffold(contributors)), + None => return SyncRound::abort(AbortReason::BadScaffold(contributors), VecDeque::new()), }; SyncRound::Fetch(Fetcher { @@ -217,10 +218,11 @@ impl Fetcher { let subchain_parent = request.subchain_parent.1; + // check if the subchain portion has been completely filled. if request.headers_request.max == 0 { if parent_hash.map_or(true, |hash| hash != subchain_parent) { let abort = AbortReason::BadScaffold(self.scaffold_contributors); - return SyncRound::Abort(abort); + return SyncRound::abort(abort, self.ready); } self.complete_requests.insert(subchain_parent, request); @@ -271,6 +273,7 @@ impl Fetcher { headers.extend(self.ready.drain(0..max)); if self.sparse.is_empty() && self.ready.is_empty() { + trace!(target: "sync", "sync round complete. Starting anew from {:?}", self.end); SyncRound::Start(RoundStart::new(self.end)) } else { SyncRound::Fetch(self) @@ -309,7 +312,7 @@ impl RoundStart { if self.sparse_headers.len() > 1 { Fetcher::new(self.sparse_headers, self.contributors.into_iter().collect()) } else { - SyncRound::Abort(AbortReason::NoResponses) + SyncRound::Abort(AbortReason::NoResponses, self.sparse_headers.into()) } } else { SyncRound::Start(self) @@ -375,14 +378,19 @@ impl RoundStart { let start = (self.start_block.0 + 1) + self.sparse_headers.len() as u64 * (ROUND_SKIP + 1); + let max = (ROUND_FRAMES - 1) - self.sparse_headers.len(); + let headers_request = HeadersRequest { start: start.into(), - max: (ROUND_FRAMES - 1) - self.sparse_headers.len(), + max: max, skip: ROUND_SKIP, reverse: false, }; if let Some(req_id) = dispatcher(headers_request.clone()) { + trace!(target: "sync", "Requesting scaffold: {} headers forward from {}, skip={}", + max, start, ROUND_SKIP); + self.pending_req = Some((req_id, headers_request)); } } @@ -397,15 +405,15 @@ pub enum SyncRound { Start(RoundStart), /// Fetching intermediate blocks during a sync round. Fetch(Fetcher), - /// Aborted. - Abort(AbortReason), + /// Aborted + Sequential headers + Abort(AbortReason, VecDeque
), } impl SyncRound { - fn abort(reason: AbortReason) -> Self { - trace!(target: "sync", "Aborting sync round: {:?}", reason); + fn abort(reason: AbortReason, remaining: VecDeque
) -> Self { + trace!(target: "sync", "Aborting sync round: {:?}. To drain: {:?}", reason, remaining); - SyncRound::Abort(reason) + SyncRound::Abort(reason, remaining) } /// Begin sync rounds from a starting block. @@ -450,7 +458,23 @@ impl SyncRound { pub fn drain(self, v: &mut Vec
, max: Option) -> Self { match self { SyncRound::Fetch(fetcher) => fetcher.drain(v, max), + SyncRound::Abort(reason, mut remaining) => { + let len = ::std::cmp::min(max.unwrap_or(usize::max_value()), remaining.len()); + v.extend(remaining.drain(..len)); + SyncRound::Abort(reason, remaining) + } other => other, } } } + +impl fmt::Debug for SyncRound { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + SyncRound::Start(ref state) => write!(f, "Scaffolding from {:?}", state.start_block), + SyncRound::Fetch(ref fetcher) => write!(f, "Filling scaffold up to {:?}", fetcher.end), + SyncRound::Abort(ref reason, ref remaining) => + write!(f, "Aborted: {:?}, {} remain", reason, remaining.len()), + } + } +} diff --git a/sync/src/light_sync/tests/mod.rs b/sync/src/light_sync/tests/mod.rs new file mode 100644 index 000000000..9eefbec41 --- /dev/null +++ b/sync/src/light_sync/tests/mod.rs @@ -0,0 +1,19 @@ +// Copyright 2015, 2016 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +#![allow(dead_code)] + +mod test_net; diff --git a/sync/src/light_sync/tests/test_net.rs b/sync/src/light_sync/tests/test_net.rs new file mode 100644 index 000000000..fa5724666 --- /dev/null +++ b/sync/src/light_sync/tests/test_net.rs @@ -0,0 +1,211 @@ +// Copyright 2015, 2016 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! TestNet peer definition. + +use std::collections::{HashSet, VecDeque}; +use std::sync::Arc; + +use light_sync::*; +use tests::helpers::{TestNet, Peer as PeerLike, TestPacket}; + +use ethcore::client::TestBlockChainClient; +use ethcore::spec::Spec; +use io::IoChannel; +use light::client::Client as LightClient; +use light::net::{LightProtocol, IoContext, Capabilities, Params as LightParams}; +use light::net::buffer_flow::FlowParams; +use network::{NodeId, PeerId}; +use util::RwLock; + +const NETWORK_ID: u64 = 0xcafebabe; + +struct TestIoContext<'a> { + queue: &'a RwLock>, + sender: Option, + to_disconnect: RwLock>, +} + +impl<'a> IoContext for TestIoContext<'a> { + fn send(&self, peer: PeerId, packet_id: u8, packet_body: Vec) { + self.queue.write().push_back(TestPacket { + data: packet_body, + packet_id: packet_id, + recipient: peer, + }) + } + + fn respond(&self, packet_id: u8, packet_body: Vec) { + if let Some(sender) = self.sender { + self.send(sender, packet_id, packet_body); + } + } + + fn disconnect_peer(&self, peer: PeerId) { + self.to_disconnect.write().insert(peer); + } + + fn disable_peer(&self, peer: PeerId) { self.disconnect_peer(peer) } + fn protocol_version(&self, _peer: PeerId) -> Option { Some(::light::net::MAX_PROTOCOL_VERSION) } + + fn persistent_peer_id(&self, _peer: PeerId) -> Option { unimplemented!() } +} + +// peer-specific data. +enum PeerData { + Light(Arc>, Arc), + Full(Arc) +} + +// test peer type. +// Either a full peer or a LES peer. +pub struct Peer { + proto: LightProtocol, + queue: RwLock>, + data: PeerData, +} + +impl Peer { + // create a new full-client peer for light client peers to sync to. + // buffer flow is made negligible. + pub fn new_full(chain: Arc) -> Self { + let params = LightParams { + network_id: NETWORK_ID, + flow_params: FlowParams::free(), + capabilities: Capabilities { + serve_headers: true, + serve_chain_since: None, + serve_state_since: None, + tx_relay: true, + }, + }; + + let proto = LightProtocol::new(chain.clone(), params); + Peer { + proto: proto, + queue: RwLock::new(VecDeque::new()), + data: PeerData::Full(chain), + } + } + + // create a new light-client peer to sync to full peers. + pub fn new_light(chain: Arc) -> Self { + let sync = Arc::new(LightSync::new(chain.clone()).unwrap()); + let params = LightParams { + network_id: NETWORK_ID, + flow_params: FlowParams::default(), + capabilities: Capabilities { + serve_headers: false, + serve_chain_since: None, + serve_state_since: None, + tx_relay: false, + }, + }; + + let mut proto = LightProtocol::new(chain.clone(), params); + proto.add_handler(sync.clone()); + Peer { + proto: proto, + queue: RwLock::new(VecDeque::new()), + data: PeerData::Light(sync, chain), + } + } + + // get the chain from the client, asserting that it is a full node. + pub fn chain(&self) -> &TestBlockChainClient { + match self.data { + PeerData::Full(ref chain) => &*chain, + _ => panic!("Attempted to access full chain on light peer."), + } + } + + // get the light chain from the peer, asserting that it is a light node. + pub fn light_chain(&self) -> &LightClient { + match self.data { + PeerData::Light(_, ref chain) => &*chain, + _ => panic!("Attempted to access light chain on full peer."), + } + } + + // get a test Io context based on + fn io(&self, sender: Option) -> TestIoContext { + TestIoContext { + queue: &self.queue, + sender: sender, + to_disconnect: RwLock::new(HashSet::new()), + } + } +} + +impl PeerLike for Peer { + type Message = TestPacket; + + fn on_connect(&self, other: PeerId) { + let io = self.io(Some(other)); + self.proto.on_connect(&other, &io); + } + + fn on_disconnect(&self, other: PeerId){ + let io = self.io(Some(other)); + self.proto.on_disconnect(other, &io); + } + + fn receive_message(&self, from: PeerId, msg: TestPacket) -> HashSet { + let io = self.io(Some(from)); + self.proto.handle_packet(&io, &from, msg.packet_id, &msg.data); + io.to_disconnect.into_inner() + } + + fn pending_message(&self) -> Option { + self.queue.write().pop_front() + } + + fn is_done(&self) -> bool { + self.queue.read().is_empty() + } + + fn sync_step(&self) { + if let PeerData::Light(_, ref client) = self.data { + client.flush_queue(); + client.import_verified(); + } + } + + fn restart_sync(&self) { } +} + +impl TestNet { + /// Create a new `TestNet` for testing light synchronization. + /// The first parameter is the number of light nodes, + /// the second is the number of full nodes. + pub fn light(n_light: usize, n_full: usize) -> Self { + let mut peers = Vec::with_capacity(n_light + n_full); + for _ in 0..n_light { + let client = LightClient::new(Default::default(), &Spec::new_test(), IoChannel::disconnected()); + peers.push(Arc::new(Peer::new_light(Arc::new(client)))) + } + + for _ in 0..n_full { + peers.push(Arc::new(Peer::new_full(Arc::new(TestBlockChainClient::new())))) + } + + TestNet { + peers: peers, + started: false, + disconnect_events: Vec::new(), + } + } +} diff --git a/sync/src/tests/chain.rs b/sync/src/tests/chain.rs index 3a598ba62..8ffc30711 100644 --- a/sync/src/tests/chain.rs +++ b/sync/src/tests/chain.rs @@ -100,8 +100,11 @@ fn forked() { fn forked_with_misbehaving_peer() { ::env_logger::init().ok(); let mut net = TestNet::new(3); + + let mut alt_spec = ::ethcore::spec::Spec::new_test(); + alt_spec.extra_data = b"fork".to_vec(); // peer 0 is on a totally different chain with higher total difficulty - net.peer_mut(0).chain = Arc::new(TestBlockChainClient::new_with_extra_data(b"fork".to_vec())); + net.peer_mut(0).chain = Arc::new(TestBlockChainClient::new_with_spec(alt_spec)); net.peer(0).chain.add_blocks(50, EachBlockWith::Nothing); net.peer(1).chain.add_blocks(10, EachBlockWith::Nothing); net.peer(2).chain.add_blocks(10, EachBlockWith::Nothing);