Test harness for lightsync (#4109)

* make on_connect/disconnect public

* free flow params constructor

* Shared ownership of LES handlers

* light provider impl for test client

* skeleton for testing light sync

* have test_client use actual genesis

* fix underflow in provider

* test harnesses for lightsync

* fix tests

* fix test failure caused by test_client changes
This commit is contained in:
Robert Habermeier 2017-01-11 14:39:03 +01:00 committed by Arkadiy Paronyan
parent 7286d42b7d
commit 7123f19a75
12 changed files with 451 additions and 106 deletions

View File

@ -66,6 +66,9 @@ pub trait LightChainClient: Send + Sync {
/// Clear the queue.
fn clear_queue(&self);
/// Flush the queue.
fn flush_queue(&self);
/// Get queue info.
fn queue_info(&self) -> queue::QueueInfo;
@ -130,7 +133,7 @@ impl Client {
BlockChainInfo {
total_difficulty: best_block.total_difficulty,
pending_total_difficulty: best_block.total_difficulty,
pending_total_difficulty: best_block.total_difficulty + self.queue.total_difficulty(),
genesis_hash: genesis_hash,
best_block_hash: best_block.hash,
best_block_number: best_block.number,
@ -151,6 +154,11 @@ impl Client {
self.chain.get_header(id)
}
/// Flush the header queue.
pub fn flush_queue(&self) {
self.queue.flush()
}
/// Get the `i`th CHT root.
pub fn cht_root(&self, i: usize) -> Option<H256> {
self.chain.cht_root(i)
@ -211,6 +219,10 @@ impl LightChainClient for Client {
self.queue.clear()
}
fn flush_queue(&self) {
Client::flush_queue(self);
}
fn queue_info(&self) -> queue::QueueInfo {
self.queue.queue_info()
}

View File

@ -23,7 +23,7 @@
//! This module provides an interface for configuration of buffer
//! flow costs and recharge rates.
//!
//! Current default costs are picked completely arbitrarily, not based
//! Current default costs are picked completely arbitrarily, not based
//! on any empirical timings or mathematical models.
use request;
@ -184,6 +184,23 @@ impl FlowParams {
}
}
/// Create effectively infinite flow params.
pub fn free() -> Self {
let free_cost = Cost(0.into(), 0.into());
FlowParams {
limit: (!0u64).into(),
recharge: 1.into(),
costs: CostTable {
headers: free_cost.clone(),
bodies: free_cost.clone(),
receipts: free_cost.clone(),
state_proofs: free_cost.clone(),
contract_codes: free_cost.clone(),
header_proofs: free_cost.clone(),
}
}
}
/// Get a reference to the buffer limit.
pub fn limit(&self) -> &U256 { &self.limit }
@ -209,7 +226,7 @@ impl FlowParams {
cost.0 + (amount * cost.1)
}
/// Compute the maximum number of costs of a specific kind which can be made
/// Compute the maximum number of costs of a specific kind which can be made
/// with the given buffer.
/// Saturates at `usize::max()`. This is not a problem in practice because
/// this amount of requests is already prohibitively large.
@ -317,4 +334,4 @@ mod tests {
assert_eq!(buffer.estimate, 100.into());
}
}
}

View File

@ -40,7 +40,6 @@ use self::buffer_flow::{Buffer, FlowParams};
use self::context::{Ctx, TickCtx};
use self::error::Punishment;
mod buffer_flow;
mod context;
mod error;
mod status;
@ -48,6 +47,8 @@ mod status;
#[cfg(test)]
mod tests;
pub mod buffer_flow;
pub use self::error::Error;
pub use self::context::{BasicContext, EventContext, IoContext};
pub use self::status::{Status, Capabilities, Announcement};
@ -237,7 +238,7 @@ pub struct LightProtocol {
pending_requests: RwLock<HashMap<usize, Requested>>,
capabilities: RwLock<Capabilities>,
flow_params: FlowParams, // assumed static and same for every peer.
handlers: Vec<Box<Handler>>,
handlers: Vec<Arc<Handler>>,
req_id: AtomicUsize,
}
@ -376,11 +377,11 @@ impl LightProtocol {
}
/// Add an event handler.
/// Ownership will be transferred to the protocol structure,
/// and the handler will be kept alive as long as it is.
///
/// These are intended to be added when the protocol structure
/// is initialized as a means of customizing its behavior.
pub fn add_handler(&mut self, handler: Box<Handler>) {
/// is initialized as a means of customizing its behavior,
/// and dispatching requests immediately upon events.
pub fn add_handler(&mut self, handler: Arc<Handler>) {
self.handlers.push(handler);
}
@ -440,8 +441,10 @@ impl LightProtocol {
}
}
// handle a packet using the given io context.
fn handle_packet(&self, io: &IoContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
/// Handle an LES packet using the given io context.
/// Packet data is _untrusted_, which means that invalid data won't lead to
/// issues.
pub fn handle_packet(&self, io: &IoContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
let rlp = UntrustedRlp::new(data);
trace!(target: "les", "Incoming packet {} from peer {}", packet_id, peer);
@ -481,6 +484,71 @@ impl LightProtocol {
}
}
/// called when a peer connects.
pub fn on_connect(&self, peer: &PeerId, io: &IoContext) {
let proto_version = match io.protocol_version(*peer).ok_or(Error::WrongNetwork) {
Ok(pv) => pv,
Err(e) => { punish(*peer, io, e); return }
};
if PROTOCOL_VERSIONS.iter().find(|x| **x == proto_version).is_none() {
punish(*peer, io, Error::UnsupportedProtocolVersion(proto_version));
return;
}
let chain_info = self.provider.chain_info();
let status = Status {
head_td: chain_info.total_difficulty,
head_hash: chain_info.best_block_hash,
head_num: chain_info.best_block_number,
genesis_hash: chain_info.genesis_hash,
protocol_version: proto_version as u32, // match peer proto version
network_id: self.network_id,
last_head: None,
};
let capabilities = self.capabilities.read().clone();
let status_packet = status::write_handshake(&status, &capabilities, Some(&self.flow_params));
self.pending_peers.write().insert(*peer, PendingPeer {
sent_head: chain_info.best_block_hash,
last_update: SteadyTime::now(),
});
io.send(*peer, packet::STATUS, status_packet);
}
/// called when a peer disconnects.
pub fn on_disconnect(&self, peer: PeerId, io: &IoContext) {
trace!(target: "les", "Peer {} disconnecting", peer);
self.pending_peers.write().remove(&peer);
if self.peers.write().remove(&peer).is_some() {
let unfulfilled: Vec<_> = self.pending_requests.read()
.iter()
.filter(|&(_, r)| r.peer_id == peer)
.map(|(&id, _)| ReqId(id))
.collect();
{
let mut pending = self.pending_requests.write();
for &ReqId(ref inner) in &unfulfilled {
pending.remove(inner);
}
}
for handler in &self.handlers {
handler.on_disconnect(&Ctx {
peer: peer,
io: io,
proto: self,
}, &unfulfilled)
}
}
}
// check timeouts and punish peers.
fn timeout_check(&self, io: &IoContext) {
let now = SteadyTime::now();
@ -529,6 +597,16 @@ impl LightProtocol {
}
}
/// Execute the given closure with a basic context derived from the I/O context.
pub fn with_context<F, T>(&self, io: &IoContext, f: F) -> T
where F: FnOnce(&BasicContext) -> T
{
f(&TickCtx {
io: io,
proto: self,
})
}
fn tick_handlers(&self, io: &IoContext) {
for handler in &self.handlers {
handler.tick(&TickCtx {
@ -540,71 +618,6 @@ impl LightProtocol {
}
impl LightProtocol {
// called when a peer connects.
fn on_connect(&self, peer: &PeerId, io: &IoContext) {
let proto_version = match io.protocol_version(*peer).ok_or(Error::WrongNetwork) {
Ok(pv) => pv,
Err(e) => { punish(*peer, io, e); return }
};
if PROTOCOL_VERSIONS.iter().find(|x| **x == proto_version).is_none() {
punish(*peer, io, Error::UnsupportedProtocolVersion(proto_version));
return;
}
let chain_info = self.provider.chain_info();
let status = Status {
head_td: chain_info.total_difficulty,
head_hash: chain_info.best_block_hash,
head_num: chain_info.best_block_number,
genesis_hash: chain_info.genesis_hash,
protocol_version: proto_version as u32, // match peer proto version
network_id: self.network_id,
last_head: None,
};
let capabilities = self.capabilities.read().clone();
let status_packet = status::write_handshake(&status, &capabilities, Some(&self.flow_params));
self.pending_peers.write().insert(*peer, PendingPeer {
sent_head: chain_info.best_block_hash,
last_update: SteadyTime::now(),
});
io.send(*peer, packet::STATUS, status_packet);
}
// called when a peer disconnects.
fn on_disconnect(&self, peer: PeerId, io: &IoContext) {
trace!(target: "les", "Peer {} disconnecting", peer);
self.pending_peers.write().remove(&peer);
if self.peers.write().remove(&peer).is_some() {
let unfulfilled: Vec<_> = self.pending_requests.read()
.iter()
.filter(|&(_, r)| r.peer_id == peer)
.map(|(&id, _)| ReqId(id))
.collect();
{
let mut pending = self.pending_requests.write();
for &ReqId(ref inner) in &unfulfilled {
pending.remove(inner);
}
}
for handler in &self.handlers {
handler.on_disconnect(&Ctx {
peer: peer,
io: io,
proto: self,
}, &unfulfilled)
}
}
}
// Handle status message from peer.
fn status(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> {
let pending = match self.pending_peers.write().remove(peer) {

View File

@ -18,7 +18,7 @@
//! These don't test of the higher level logic on top of
use ethcore::blockchain_info::BlockChainInfo;
use ethcore::client::{BlockChainClient, EachBlockWith, TestBlockChainClient};
use ethcore::client::{EachBlockWith, TestBlockChainClient};
use ethcore::ids::BlockId;
use ethcore::transaction::PendingTransaction;
use ethcore::encoded;
@ -88,7 +88,7 @@ impl Provider for TestProvider {
}
fn reorg_depth(&self, a: &H256, b: &H256) -> Option<u64> {
self.0.client.tree_route(a, b).map(|route| route.index as u64)
self.0.client.reorg_depth(a, b)
}
fn earliest_state(&self) -> Option<u64> {
@ -305,7 +305,9 @@ fn get_block_bodies() {
}
let request = request::Bodies {
block_hashes: (0..10).map(|i| provider.client.block_hash(BlockId::Number(i)).unwrap()).collect(),
block_hashes: (0..10).map(|i|
provider.client.block_header(BlockId::Number(i)).unwrap().hash()
).collect()
};
let req_id = 111;
@ -353,8 +355,9 @@ fn get_block_receipts() {
// find the first 10 block hashes starting with `f` because receipts are only provided
// by the test client in that case.
let block_hashes: Vec<_> = (0..1000).map(|i| provider.client.block_hash(BlockId::Number(i)).unwrap())
.filter(|hash| format!("{}", hash).starts_with("f")).take(10).collect();
let block_hashes: Vec<_> = (0..1000).map(|i|
provider.client.block_header(BlockId::Number(i)).unwrap().hash()
).filter(|hash| format!("{}", hash).starts_with("f")).take(10).collect();
let request = request::Receipts {
block_hashes: block_hashes.clone(),

View File

@ -83,7 +83,7 @@ pub trait Provider: Send + Sync {
(0u64..req.max as u64)
.map(|x: u64| x.saturating_mul(req.skip + 1))
.take_while(|x| if req.reverse { x < &start_num } else { best_num - start_num >= *x })
.take_while(|x| if req.reverse { x < &start_num } else { best_num.saturating_sub(start_num) >= *x })
.map(|x| if req.reverse { start_num - x } else { start_num + x })
.map(|x| self.block_header(BlockId::Number(x)))
.take_while(|x| x.is_some())

View File

@ -26,6 +26,7 @@ use blockchain::TreeRoute;
use client::{
BlockChainClient, MiningBlockChainClient, EngineClient, BlockChainInfo, BlockStatus, BlockId,
TransactionId, UncleId, TraceId, TraceFilter, LastHashes, CallAnalytics, BlockImportError,
ProvingBlockChainClient,
};
use db::{NUM_COLUMNS, COL_STATE};
use header::{Header as BlockHeader, BlockNumber};
@ -134,6 +135,9 @@ impl TestBlockChainClient {
/// Create test client with custom spec and extra data.
pub fn new_with_spec_and_extra(spec: Spec, extra_data: Bytes) -> Self {
let genesis_block = spec.genesis_block();
let genesis_hash = spec.genesis_header().hash();
let mut client = TestBlockChainClient {
blocks: RwLock::new(HashMap::new()),
numbers: RwLock::new(HashMap::new()),
@ -158,8 +162,12 @@ impl TestBlockChainClient {
traces: RwLock::new(None),
history: RwLock::new(None),
};
client.add_blocks(1, EachBlockWith::Nothing); // add genesis block
client.genesis_hash = client.last_hash.read().clone();
// insert genesis hash.
client.blocks.get_mut().insert(genesis_hash, genesis_block);
client.numbers.get_mut().insert(0, genesis_hash);
*client.last_hash.get_mut() = genesis_hash;
client.genesis_hash = genesis_hash;
client
}
@ -720,6 +728,20 @@ impl BlockChainClient for TestBlockChainClient {
fn registry_address(&self, _name: String) -> Option<Address> { None }
}
impl ProvingBlockChainClient for TestBlockChainClient {
fn prove_storage(&self, _: H256, _: H256, _: u32, _: BlockId) -> Vec<Bytes> {
Vec::new()
}
fn prove_account(&self, _: H256, _: u32, _: BlockId) -> Vec<Bytes> {
Vec::new()
}
fn code_by_hash(&self, _: H256, _: BlockId) -> Bytes {
Vec::new()
}
}
impl EngineClient for TestBlockChainClient {
fn update_sealing(&self) {
self.miner.update_sealing(self)

View File

@ -179,7 +179,7 @@ impl EthSync {
};
let mut light_proto = LightProtocol::new(params.provider, light_params);
light_proto.add_handler(Box::new(TxRelay(params.chain.clone())));
light_proto.add_handler(Arc::new(TxRelay(params.chain.clone())));
Arc::new(light_proto)
})
@ -612,7 +612,7 @@ impl LightSync {
let mut light_proto = LightProtocol::new(params.client.clone(), light_params);
let sync_handler = try!(SyncHandler::new(params.client.clone()));
light_proto.add_handler(Box::new(sync_handler));
light_proto.add_handler(Arc::new(sync_handler));
Arc::new(light_proto)
};

View File

@ -23,6 +23,14 @@
//!
//! This is written assuming that the client and sync service are running
//! in the same binary; unlike a full node which might communicate via IPC.
//!
//!
//! Sync strategy:
//! - Find a common ancestor with peers.
//! - Split the chain up into subchains, which are downloaded in parallel from various peers in rounds.
//! - When within a certain distance of the head of the chain, aggressively download all
//! announced blocks.
//! - On bad block/response, punish peer and reset.
use std::collections::HashMap;
use std::mem;
@ -43,6 +51,9 @@ use self::sync_round::{AbortReason, SyncRound, ResponseContext};
mod response;
mod sync_round;
#[cfg(test)]
mod tests;
/// Peer chain info.
#[derive(Clone)]
struct ChainInfo {
@ -64,6 +75,7 @@ impl Peer {
}
}
// search for a common ancestor with the best chain.
#[derive(Debug)]
enum AncestorSearch {
Queued(u64), // queued to search for blocks starting from here.
Awaiting(ReqId, u64, request::Headers), // awaiting response for this request.
@ -125,6 +137,9 @@ impl AncestorSearch {
match self {
AncestorSearch::Queued(start) => {
trace!(target: "sync", "Requesting {} reverse headers from {} to find common ancestor",
BATCH_SIZE, start);
let req = request::Headers {
start: start.into(),
max: ::std::cmp::min(start as usize, BATCH_SIZE),
@ -143,8 +158,9 @@ impl AncestorSearch {
}
// synchronization state machine.
#[derive(Debug)]
enum SyncState {
// Idle (waiting for peers)
// Idle (waiting for peers) or at chain head.
Idle,
// searching for common ancestor with best chain.
// queue should be cleared at this phase.
@ -328,19 +344,19 @@ impl<L: LightChainClient> LightSync<L> {
return;
}
trace!(target: "sync", "Beginning search for common ancestor");
self.client.clear_queue();
self.client.flush_queue();
let chain_info = self.client.chain_info();
trace!(target: "sync", "Beginning search for common ancestor from {:?}",
(chain_info.best_block_number, chain_info.best_block_hash));
*state = SyncState::AncestorSearch(AncestorSearch::begin(chain_info.best_block_number));
}
fn maintain_sync(&self, ctx: &BasicContext) {
const DRAIN_AMOUNT: usize = 128;
debug!(target: "sync", "Maintaining sync.");
let mut state = self.state.lock();
debug!(target: "sync", "Maintaining sync ({:?})", &*state);
// drain any pending blocks into the queue.
{
@ -358,6 +374,7 @@ impl<L: LightChainClient> LightSync<L> {
};
if sink.is_empty() { break }
trace!(target: "sync", "Drained {} headers to import", sink.len());
for header in sink.drain(..) {
if let Err(e) = self.client.queue_header(header) {
@ -372,8 +389,12 @@ impl<L: LightChainClient> LightSync<L> {
// handle state transitions.
{
let chain_info = self.client.chain_info();
let best_td = chain_info.total_difficulty;
match mem::replace(&mut *state, SyncState::Idle) {
SyncState::Rounds(SyncRound::Abort(reason)) => {
_ if self.best_seen.lock().as_ref().map_or(true, |&(_, td)| best_td >= td)
=> *state = SyncState::Idle,
SyncState::Rounds(SyncRound::Abort(reason, _)) => {
match reason {
AbortReason::BadScaffold(bad_peers) => {
debug!(target: "sync", "Disabling peers responsible for bad scaffold");
@ -394,7 +415,7 @@ impl<L: LightChainClient> LightSync<L> {
}
SyncState::AncestorSearch(AncestorSearch::Genesis) => {
// Same here.
let g_hash = self.client.chain_info().genesis_hash;
let g_hash = chain_info.genesis_hash;
*state = SyncState::Rounds(SyncRound::begin(0, g_hash));
}
SyncState::Idle => self.begin_search(&mut state),

View File

@ -18,6 +18,7 @@
use std::cmp::Ordering;
use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
use std::fmt;
use ethcore::header::Header;
@ -29,9 +30,9 @@ use util::{Bytes, H256};
use super::response;
// amount of blocks between each scaffold entry.
/// amount of blocks between each scaffold entry.
// TODO: move these into parameters for `RoundStart::new`?
const ROUND_SKIP: u64 = 255;
pub const ROUND_SKIP: u64 = 255;
// amount of scaffold frames: these are the blank spaces in "X___X___X"
const ROUND_FRAMES: usize = 255;
@ -132,7 +133,7 @@ impl Fetcher {
let end = match sparse_headers.last().map(|h| (h.number(), h.hash())) {
Some(end) => end,
None => return SyncRound::abort(AbortReason::BadScaffold(contributors)),
None => return SyncRound::abort(AbortReason::BadScaffold(contributors), VecDeque::new()),
};
SyncRound::Fetch(Fetcher {
@ -217,10 +218,11 @@ impl Fetcher {
let subchain_parent = request.subchain_parent.1;
// check if the subchain portion has been completely filled.
if request.headers_request.max == 0 {
if parent_hash.map_or(true, |hash| hash != subchain_parent) {
let abort = AbortReason::BadScaffold(self.scaffold_contributors);
return SyncRound::Abort(abort);
return SyncRound::abort(abort, self.ready);
}
self.complete_requests.insert(subchain_parent, request);
@ -271,6 +273,7 @@ impl Fetcher {
headers.extend(self.ready.drain(0..max));
if self.sparse.is_empty() && self.ready.is_empty() {
trace!(target: "sync", "sync round complete. Starting anew from {:?}", self.end);
SyncRound::Start(RoundStart::new(self.end))
} else {
SyncRound::Fetch(self)
@ -309,7 +312,7 @@ impl RoundStart {
if self.sparse_headers.len() > 1 {
Fetcher::new(self.sparse_headers, self.contributors.into_iter().collect())
} else {
SyncRound::Abort(AbortReason::NoResponses)
SyncRound::Abort(AbortReason::NoResponses, self.sparse_headers.into())
}
} else {
SyncRound::Start(self)
@ -375,14 +378,19 @@ impl RoundStart {
let start = (self.start_block.0 + 1)
+ self.sparse_headers.len() as u64 * (ROUND_SKIP + 1);
let max = (ROUND_FRAMES - 1) - self.sparse_headers.len();
let headers_request = HeadersRequest {
start: start.into(),
max: (ROUND_FRAMES - 1) - self.sparse_headers.len(),
max: max,
skip: ROUND_SKIP,
reverse: false,
};
if let Some(req_id) = dispatcher(headers_request.clone()) {
trace!(target: "sync", "Requesting scaffold: {} headers forward from {}, skip={}",
max, start, ROUND_SKIP);
self.pending_req = Some((req_id, headers_request));
}
}
@ -397,15 +405,15 @@ pub enum SyncRound {
Start(RoundStart),
/// Fetching intermediate blocks during a sync round.
Fetch(Fetcher),
/// Aborted.
Abort(AbortReason),
/// Aborted + Sequential headers
Abort(AbortReason, VecDeque<Header>),
}
impl SyncRound {
fn abort(reason: AbortReason) -> Self {
trace!(target: "sync", "Aborting sync round: {:?}", reason);
fn abort(reason: AbortReason, remaining: VecDeque<Header>) -> Self {
trace!(target: "sync", "Aborting sync round: {:?}. To drain: {:?}", reason, remaining);
SyncRound::Abort(reason)
SyncRound::Abort(reason, remaining)
}
/// Begin sync rounds from a starting block.
@ -450,7 +458,23 @@ impl SyncRound {
pub fn drain(self, v: &mut Vec<Header>, max: Option<usize>) -> Self {
match self {
SyncRound::Fetch(fetcher) => fetcher.drain(v, max),
SyncRound::Abort(reason, mut remaining) => {
let len = ::std::cmp::min(max.unwrap_or(usize::max_value()), remaining.len());
v.extend(remaining.drain(..len));
SyncRound::Abort(reason, remaining)
}
other => other,
}
}
}
impl fmt::Debug for SyncRound {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
SyncRound::Start(ref state) => write!(f, "Scaffolding from {:?}", state.start_block),
SyncRound::Fetch(ref fetcher) => write!(f, "Filling scaffold up to {:?}", fetcher.end),
SyncRound::Abort(ref reason, ref remaining) =>
write!(f, "Aborted: {:?}, {} remain", reason, remaining.len()),
}
}
}

View File

@ -0,0 +1,19 @@
// Copyright 2015, 2016 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
#![allow(dead_code)]
mod test_net;

View File

@ -0,0 +1,211 @@
// Copyright 2015, 2016 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! TestNet peer definition.
use std::collections::{HashSet, VecDeque};
use std::sync::Arc;
use light_sync::*;
use tests::helpers::{TestNet, Peer as PeerLike, TestPacket};
use ethcore::client::TestBlockChainClient;
use ethcore::spec::Spec;
use io::IoChannel;
use light::client::Client as LightClient;
use light::net::{LightProtocol, IoContext, Capabilities, Params as LightParams};
use light::net::buffer_flow::FlowParams;
use network::{NodeId, PeerId};
use util::RwLock;
const NETWORK_ID: u64 = 0xcafebabe;
struct TestIoContext<'a> {
queue: &'a RwLock<VecDeque<TestPacket>>,
sender: Option<PeerId>,
to_disconnect: RwLock<HashSet<PeerId>>,
}
impl<'a> IoContext for TestIoContext<'a> {
fn send(&self, peer: PeerId, packet_id: u8, packet_body: Vec<u8>) {
self.queue.write().push_back(TestPacket {
data: packet_body,
packet_id: packet_id,
recipient: peer,
})
}
fn respond(&self, packet_id: u8, packet_body: Vec<u8>) {
if let Some(sender) = self.sender {
self.send(sender, packet_id, packet_body);
}
}
fn disconnect_peer(&self, peer: PeerId) {
self.to_disconnect.write().insert(peer);
}
fn disable_peer(&self, peer: PeerId) { self.disconnect_peer(peer) }
fn protocol_version(&self, _peer: PeerId) -> Option<u8> { Some(::light::net::MAX_PROTOCOL_VERSION) }
fn persistent_peer_id(&self, _peer: PeerId) -> Option<NodeId> { unimplemented!() }
}
// peer-specific data.
enum PeerData {
Light(Arc<LightSync<LightClient>>, Arc<LightClient>),
Full(Arc<TestBlockChainClient>)
}
// test peer type.
// Either a full peer or a LES peer.
pub struct Peer {
proto: LightProtocol,
queue: RwLock<VecDeque<TestPacket>>,
data: PeerData,
}
impl Peer {
// create a new full-client peer for light client peers to sync to.
// buffer flow is made negligible.
pub fn new_full(chain: Arc<TestBlockChainClient>) -> Self {
let params = LightParams {
network_id: NETWORK_ID,
flow_params: FlowParams::free(),
capabilities: Capabilities {
serve_headers: true,
serve_chain_since: None,
serve_state_since: None,
tx_relay: true,
},
};
let proto = LightProtocol::new(chain.clone(), params);
Peer {
proto: proto,
queue: RwLock::new(VecDeque::new()),
data: PeerData::Full(chain),
}
}
// create a new light-client peer to sync to full peers.
pub fn new_light(chain: Arc<LightClient>) -> Self {
let sync = Arc::new(LightSync::new(chain.clone()).unwrap());
let params = LightParams {
network_id: NETWORK_ID,
flow_params: FlowParams::default(),
capabilities: Capabilities {
serve_headers: false,
serve_chain_since: None,
serve_state_since: None,
tx_relay: false,
},
};
let mut proto = LightProtocol::new(chain.clone(), params);
proto.add_handler(sync.clone());
Peer {
proto: proto,
queue: RwLock::new(VecDeque::new()),
data: PeerData::Light(sync, chain),
}
}
// get the chain from the client, asserting that it is a full node.
pub fn chain(&self) -> &TestBlockChainClient {
match self.data {
PeerData::Full(ref chain) => &*chain,
_ => panic!("Attempted to access full chain on light peer."),
}
}
// get the light chain from the peer, asserting that it is a light node.
pub fn light_chain(&self) -> &LightClient {
match self.data {
PeerData::Light(_, ref chain) => &*chain,
_ => panic!("Attempted to access light chain on full peer."),
}
}
// get a test Io context based on
fn io(&self, sender: Option<PeerId>) -> TestIoContext {
TestIoContext {
queue: &self.queue,
sender: sender,
to_disconnect: RwLock::new(HashSet::new()),
}
}
}
impl PeerLike for Peer {
type Message = TestPacket;
fn on_connect(&self, other: PeerId) {
let io = self.io(Some(other));
self.proto.on_connect(&other, &io);
}
fn on_disconnect(&self, other: PeerId){
let io = self.io(Some(other));
self.proto.on_disconnect(other, &io);
}
fn receive_message(&self, from: PeerId, msg: TestPacket) -> HashSet<PeerId> {
let io = self.io(Some(from));
self.proto.handle_packet(&io, &from, msg.packet_id, &msg.data);
io.to_disconnect.into_inner()
}
fn pending_message(&self) -> Option<TestPacket> {
self.queue.write().pop_front()
}
fn is_done(&self) -> bool {
self.queue.read().is_empty()
}
fn sync_step(&self) {
if let PeerData::Light(_, ref client) = self.data {
client.flush_queue();
client.import_verified();
}
}
fn restart_sync(&self) { }
}
impl TestNet<Peer> {
/// Create a new `TestNet` for testing light synchronization.
/// The first parameter is the number of light nodes,
/// the second is the number of full nodes.
pub fn light(n_light: usize, n_full: usize) -> Self {
let mut peers = Vec::with_capacity(n_light + n_full);
for _ in 0..n_light {
let client = LightClient::new(Default::default(), &Spec::new_test(), IoChannel::disconnected());
peers.push(Arc::new(Peer::new_light(Arc::new(client))))
}
for _ in 0..n_full {
peers.push(Arc::new(Peer::new_full(Arc::new(TestBlockChainClient::new()))))
}
TestNet {
peers: peers,
started: false,
disconnect_events: Vec::new(),
}
}
}

View File

@ -100,8 +100,11 @@ fn forked() {
fn forked_with_misbehaving_peer() {
::env_logger::init().ok();
let mut net = TestNet::new(3);
let mut alt_spec = ::ethcore::spec::Spec::new_test();
alt_spec.extra_data = b"fork".to_vec();
// peer 0 is on a totally different chain with higher total difficulty
net.peer_mut(0).chain = Arc::new(TestBlockChainClient::new_with_extra_data(b"fork".to_vec()));
net.peer_mut(0).chain = Arc::new(TestBlockChainClient::new_with_spec(alt_spec));
net.peer(0).chain.add_blocks(50, EachBlockWith::Nothing);
net.peer(1).chain.add_blocks(10, EachBlockWith::Nothing);
net.peer(2).chain.add_blocks(10, EachBlockWith::Nothing);