Merge pull request #3755 from ethcore/lightserv

LES Part 3: Event handlers and handling responses
This commit is contained in:
Robert Habermeier 2016-12-10 00:40:46 +01:00 committed by GitHub
commit 4d25445af5
32 changed files with 1444 additions and 281 deletions

17
Cargo.lock generated
View File

@ -18,6 +18,7 @@ dependencies = [
"ethcore-ipc-hypervisor 1.2.0", "ethcore-ipc-hypervisor 1.2.0",
"ethcore-ipc-nano 1.5.0", "ethcore-ipc-nano 1.5.0",
"ethcore-ipc-tests 0.1.0", "ethcore-ipc-tests 0.1.0",
"ethcore-light 1.5.0",
"ethcore-logger 1.5.0", "ethcore-logger 1.5.0",
"ethcore-rpc 1.5.0", "ethcore-rpc 1.5.0",
"ethcore-signer 1.5.0", "ethcore-signer 1.5.0",
@ -456,6 +457,21 @@ dependencies = [
"semver 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", "semver 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "ethcore-light"
version = "1.5.0"
dependencies = [
"ethcore 1.5.0",
"ethcore-io 1.5.0",
"ethcore-ipc 1.5.0",
"ethcore-ipc-codegen 1.5.0",
"ethcore-network 1.5.0",
"ethcore-util 1.5.0",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"rlp 0.1.0",
"time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "ethcore-logger" name = "ethcore-logger"
version = "1.5.0" version = "1.5.0"
@ -665,6 +681,7 @@ dependencies = [
"ethcore-ipc 1.5.0", "ethcore-ipc 1.5.0",
"ethcore-ipc-codegen 1.5.0", "ethcore-ipc-codegen 1.5.0",
"ethcore-ipc-nano 1.5.0", "ethcore-ipc-nano 1.5.0",
"ethcore-light 1.5.0",
"ethcore-network 1.5.0", "ethcore-network 1.5.0",
"ethcore-util 1.5.0", "ethcore-util 1.5.0",
"ethkey 0.2.0", "ethkey 0.2.0",

View File

@ -47,6 +47,7 @@ rlp = { path = "util/rlp" }
ethcore-stratum = { path = "stratum" } ethcore-stratum = { path = "stratum" }
ethcore-dapps = { path = "dapps", optional = true } ethcore-dapps = { path = "dapps", optional = true }
clippy = { version = "0.0.103", optional = true} clippy = { version = "0.0.103", optional = true}
ethcore-light = { path = "ethcore/light" }
[target.'cfg(windows)'.dependencies] [target.'cfg(windows)'.dependencies]
winapi = "0.2" winapi = "0.2"

View File

@ -8,14 +8,18 @@ authors = ["Ethcore <admin@ethcore.io>"]
build = "build.rs" build = "build.rs"
[build-dependencies] [build-dependencies]
"ethcore-ipc-codegen" = { path = "../../ipc/codegen" } "ethcore-ipc-codegen" = { path = "../../ipc/codegen", optional = true }
[dependencies] [dependencies]
log = "0.3" log = "0.3"
ethcore = { path = ".." } ethcore = { path = ".."}
ethcore-util = { path = "../../util" } ethcore-util = { path = "../../util" }
ethcore-network = { path = "../../util/network" } ethcore-network = { path = "../../util/network" }
ethcore-io = { path = "../../util/io" } ethcore-io = { path = "../../util/io" }
ethcore-ipc = { path = "../../ipc/rpc" } ethcore-ipc = { path = "../../ipc/rpc", optional = true }
rlp = { path = "../../util/rlp" } rlp = { path = "../../util/rlp" }
time = "0.1" time = "0.1"
[features]
default = []
ipc = ["ethcore-ipc", "ethcore-ipc-codegen"]

View File

@ -14,8 +14,14 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
#[cfg(feature = "ipc")]
extern crate ethcore_ipc_codegen; extern crate ethcore_ipc_codegen;
#[cfg(feature = "ipc")]
fn main() { fn main() {
ethcore_ipc_codegen::derive_binary("src/types/mod.rs.in").unwrap(); ethcore_ipc_codegen::derive_binary("src/types/mod.rs.in").unwrap();
ethcore_ipc_codegen::derive_ipc_cond("src/provider.rs", true).unwrap();
} }
#[cfg(not(feature = "ipc"))]
fn main() { }

View File

@ -33,8 +33,21 @@
pub mod client; pub mod client;
pub mod net; pub mod net;
#[cfg(not(feature = "ipc"))]
pub mod provider; pub mod provider;
#[cfg(feature = "ipc")]
pub mod provider {
#![allow(dead_code, unused_assignments, unused_variables, missing_docs)] // codegen issues
include!(concat!(env!("OUT_DIR"), "/provider.rs"));
}
#[cfg(feature = "ipc")]
pub mod remote {
pub use provider::LightProviderClient;
}
mod types; mod types;
pub use self::provider::Provider; pub use self::provider::Provider;
@ -47,6 +60,8 @@ extern crate ethcore;
extern crate ethcore_util as util; extern crate ethcore_util as util;
extern crate ethcore_network as network; extern crate ethcore_network as network;
extern crate ethcore_io as io; extern crate ethcore_io as io;
extern crate ethcore_ipc as ipc;
extern crate rlp; extern crate rlp;
extern crate time; extern crate time;
#[cfg(feature = "ipc")]
extern crate ethcore_ipc as ipc;

View File

@ -22,6 +22,9 @@
//! //!
//! This module provides an interface for configuration of buffer //! This module provides an interface for configuration of buffer
//! flow costs and recharge rates. //! flow costs and recharge rates.
//!
//! Current default costs are picked completely arbitrarily, not based
//! on any empirical timings or mathematical models.
use request; use request;
use super::packet; use super::packet;
@ -273,6 +276,16 @@ impl FlowParams {
} }
} }
impl Default for FlowParams {
fn default() -> Self {
FlowParams {
limit: 50_000_000.into(),
costs: CostTable::default(),
recharge: 100_000.into(),
}
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View File

@ -0,0 +1,120 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! I/O and event context generalizations.
use network::{NetworkContext, PeerId};
use super::{Announcement, LightProtocol, ReqId};
use super::error::Error;
use request::Request;
/// An I/O context which allows sending and receiving packets as well as
/// disconnecting peers. This is used as a generalization of the portions
/// of a p2p network which the light protocol structure makes use of.
pub trait IoContext {
/// Send a packet to a specific peer.
fn send(&self, peer: PeerId, packet_id: u8, packet_body: Vec<u8>);
/// Respond to a peer's message. Only works if this context is a byproduct
/// of a packet handler.
fn respond(&self, packet_id: u8, packet_body: Vec<u8>);
/// Disconnect a peer.
fn disconnect_peer(&self, peer: PeerId);
/// Disable a peer -- this is a disconnect + a time-out.
fn disable_peer(&self, peer: PeerId);
/// Get a peer's protocol version.
fn protocol_version(&self, peer: PeerId) -> Option<u8>;
}
impl<'a> IoContext for NetworkContext<'a> {
fn send(&self, peer: PeerId, packet_id: u8, packet_body: Vec<u8>) {
if let Err(e) = self.send(peer, packet_id, packet_body) {
debug!(target: "les", "Error sending packet to peer {}: {}", peer, e);
}
}
fn respond(&self, packet_id: u8, packet_body: Vec<u8>) {
if let Err(e) = self.respond(packet_id, packet_body) {
debug!(target: "les", "Error responding to peer message: {}", e);
}
}
fn disconnect_peer(&self, peer: PeerId) {
NetworkContext::disconnect_peer(self, peer);
}
fn disable_peer(&self, peer: PeerId) {
NetworkContext::disable_peer(self, peer);
}
fn protocol_version(&self, peer: PeerId) -> Option<u8> {
self.protocol_version(self.subprotocol_name(), peer)
}
}
/// Context for a protocol event.
pub trait EventContext {
/// Get the peer relevant to the event e.g. message sender,
/// disconnected/connected peer.
fn peer(&self) -> PeerId;
/// Make a request from a peer.
fn request_from(&self, peer: PeerId, request: Request) -> Result<ReqId, Error>;
/// Make an announcement of new capabilities to the rest of the peers.
// TODO: maybe just put this on a timer in LightProtocol?
fn make_announcement(&self, announcement: Announcement);
/// Disconnect a peer.
fn disconnect_peer(&self, peer: PeerId);
/// Disable a peer.
fn disable_peer(&self, peer: PeerId);
}
/// Concrete implementation of `EventContext` over the light protocol struct and
/// an io context.
pub struct Ctx<'a> {
/// Io context to enable immediate response to events.
pub io: &'a IoContext,
/// Protocol implementation.
pub proto: &'a LightProtocol,
/// Relevant peer for event.
pub peer: PeerId,
}
impl<'a> EventContext for Ctx<'a> {
fn peer(&self) -> PeerId { self.peer }
fn request_from(&self, peer: PeerId, request: Request) -> Result<ReqId, Error> {
self.proto.request_from(self.io, &peer, request)
}
fn make_announcement(&self, announcement: Announcement) {
self.proto.make_announcement(self.io, announcement);
}
fn disconnect_peer(&self, peer: PeerId) {
self.io.disconnect_peer(peer);
}
fn disable_peer(&self, peer: PeerId) {
self.io.disable_peer(peer);
}
}

View File

@ -54,6 +54,14 @@ pub enum Error {
WrongNetwork, WrongNetwork,
/// Unknown peer. /// Unknown peer.
UnknownPeer, UnknownPeer,
/// Unsolicited response.
UnsolicitedResponse,
/// Not a server.
NotServer,
/// Unsupported protocol version.
UnsupportedProtocolVersion(u8),
/// Bad protocol version.
BadProtocolVersion,
} }
impl Error { impl Error {
@ -67,6 +75,10 @@ impl Error {
Error::UnexpectedHandshake => Punishment::Disconnect, Error::UnexpectedHandshake => Punishment::Disconnect,
Error::WrongNetwork => Punishment::Disable, Error::WrongNetwork => Punishment::Disable,
Error::UnknownPeer => Punishment::Disconnect, Error::UnknownPeer => Punishment::Disconnect,
Error::UnsolicitedResponse => Punishment::Disable,
Error::NotServer => Punishment::Disable,
Error::UnsupportedProtocolVersion(_) => Punishment::Disable,
Error::BadProtocolVersion => Punishment::Disable,
} }
} }
} }
@ -92,7 +104,11 @@ impl fmt::Display for Error {
Error::UnrecognizedPacket(code) => write!(f, "Unrecognized packet: 0x{:x}", code), Error::UnrecognizedPacket(code) => write!(f, "Unrecognized packet: 0x{:x}", code),
Error::UnexpectedHandshake => write!(f, "Unexpected handshake"), Error::UnexpectedHandshake => write!(f, "Unexpected handshake"),
Error::WrongNetwork => write!(f, "Wrong network"), Error::WrongNetwork => write!(f, "Wrong network"),
Error::UnknownPeer => write!(f, "unknown peer"), Error::UnknownPeer => write!(f, "Unknown peer"),
Error::UnsolicitedResponse => write!(f, "Peer provided unsolicited data"),
Error::NotServer => write!(f, "Peer not a server."),
Error::UnsupportedProtocolVersion(pv) => write!(f, "Unsupported protocol version: {}", pv),
Error::BadProtocolVersion => write!(f, "Bad protocol version in handshake"),
} }
} }
} }

View File

@ -20,36 +20,51 @@
//! See https://github.com/ethcore/parity/wiki/Light-Ethereum-Subprotocol-(LES) //! See https://github.com/ethcore/parity/wiki/Light-Ethereum-Subprotocol-(LES)
use ethcore::transaction::SignedTransaction; use ethcore::transaction::SignedTransaction;
use ethcore::receipt::Receipt;
use io::TimerToken; use io::TimerToken;
use network::{NetworkProtocolHandler, NetworkContext, NetworkError, PeerId}; use network::{NetworkProtocolHandler, NetworkContext, PeerId};
use rlp::{RlpStream, Stream, UntrustedRlp, View}; use rlp::{RlpStream, Stream, UntrustedRlp, View};
use util::hash::H256; use util::hash::H256;
use util::{Mutex, RwLock, U256}; use util::{Bytes, Mutex, RwLock, U256};
use time::SteadyTime; use time::{Duration, SteadyTime};
use std::collections::{HashMap, HashSet}; use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use provider::Provider; use provider::Provider;
use request::{self, Request}; use request::{self, Request};
use self::buffer_flow::{Buffer, FlowParams}; use self::buffer_flow::{Buffer, FlowParams};
use self::context::Ctx;
use self::error::{Error, Punishment}; use self::error::{Error, Punishment};
mod buffer_flow; mod buffer_flow;
mod context;
mod error; mod error;
mod status; mod status;
pub use self::status::{Status, Capabilities, Announcement, NetworkId}; #[cfg(test)]
mod tests;
pub use self::context::{EventContext, IoContext};
pub use self::status::{Status, Capabilities, Announcement};
const TIMEOUT: TimerToken = 0; const TIMEOUT: TimerToken = 0;
const TIMEOUT_INTERVAL_MS: u64 = 1000; const TIMEOUT_INTERVAL_MS: u64 = 1000;
// LPV1 // minimum interval between updates.
const PROTOCOL_VERSION: u32 = 1; const UPDATE_INTERVAL_MS: i64 = 5000;
// TODO [rob] make configurable. // Supported protocol versions.
const PROTOCOL_ID: [u8; 3] = *b"les"; pub const PROTOCOL_VERSIONS: &'static [u8] = &[1];
// Max protocol version.
pub const MAX_PROTOCOL_VERSION: u8 = 1;
// Packet count for LES.
pub const PACKET_COUNT: u8 = 15;
// packet ID definitions. // packet ID definitions.
mod packet { mod packet {
@ -95,17 +110,19 @@ pub struct ReqId(usize);
// may not have received one for. // may not have received one for.
struct PendingPeer { struct PendingPeer {
sent_head: H256, sent_head: H256,
last_update: SteadyTime,
proto_version: u8,
} }
// data about each peer. // data about each peer.
struct Peer { struct Peer {
local_buffer: Buffer, // their buffer relative to us local_buffer: Buffer, // their buffer relative to us
remote_buffer: Buffer, // our buffer relative to them
current_asking: HashSet<usize>, // pending request ids.
status: Status, status: Status,
capabilities: Capabilities, capabilities: Capabilities,
remote_flow: FlowParams, remote_flow: Option<(Buffer, FlowParams)>,
sent_head: H256, // last head we've given them. sent_head: H256, // last head we've given them.
last_update: SteadyTime,
proto_version: u8,
} }
impl Peer { impl Peer {
@ -126,38 +143,56 @@ impl Peer {
self.local_buffer.current() self.local_buffer.current()
} }
// recharge remote buffer with remote flow params.
fn recharge_remote(&mut self) {
let flow = &mut self.remote_flow;
flow.recharge(&mut self.remote_buffer);
}
} }
/// An LES event handler. /// An LES event handler.
///
/// Each handler function takes a context which describes the relevant peer
/// and gives references to the IO layer and protocol structure so new messages
/// can be dispatched immediately.
///
/// Request responses are not guaranteed to be complete or valid, but passed IDs will be correct.
/// Response handlers are not given a copy of the original request; it is assumed
/// that relevant data will be stored by interested handlers.
pub trait Handler: Send + Sync { pub trait Handler: Send + Sync {
/// Called when a peer connects. /// Called when a peer connects.
fn on_connect(&self, _id: PeerId, _status: &Status, _capabilities: &Capabilities) { } fn on_connect(&self, _ctx: &EventContext, _status: &Status, _capabilities: &Capabilities) { }
/// Called when a peer disconnects /// Called when a peer disconnects, with a list of unfulfilled request IDs as
fn on_disconnect(&self, _id: PeerId) { } /// of yet.
fn on_disconnect(&self, _ctx: &EventContext, _unfulfilled: &[ReqId]) { }
/// Called when a peer makes an announcement. /// Called when a peer makes an announcement.
fn on_announcement(&self, _id: PeerId, _announcement: &Announcement) { } fn on_announcement(&self, _ctx: &EventContext, _announcement: &Announcement) { }
/// Called when a peer requests relay of some transactions. /// Called when a peer requests relay of some transactions.
fn on_transactions(&self, _id: PeerId, _relay: &[SignedTransaction]) { } fn on_transactions(&self, _ctx: &EventContext, _relay: &[SignedTransaction]) { }
/// Called when a peer responds with block bodies.
fn on_block_bodies(&self, _ctx: &EventContext, _req_id: ReqId, _bodies: &[Bytes]) { }
/// Called when a peer responds with block headers.
fn on_block_headers(&self, _ctx: &EventContext, _req_id: ReqId, _headers: &[Bytes]) { }
/// Called when a peer responds with block receipts.
fn on_receipts(&self, _ctx: &EventContext, _req_id: ReqId, _receipts: &[Vec<Receipt>]) { }
/// Called when a peer responds with state proofs. Each proof is a series of trie
/// nodes in ascending order by distance from the root.
fn on_state_proofs(&self, _ctx: &EventContext, _req_id: ReqId, _proofs: &[Vec<Bytes>]) { }
/// Called when a peer responds with contract code.
fn on_code(&self, _ctx: &EventContext, _req_id: ReqId, _codes: &[Bytes]) { }
/// Called when a peer responds with header proofs. Each proof is a block header coupled
/// with a series of trie nodes is ascending order by distance from the root.
fn on_header_proofs(&self, _ctx: &EventContext, _req_id: ReqId, _proofs: &[(Bytes, Vec<Bytes>)]) { }
/// Called on abort.
fn on_abort(&self) { }
} }
// a request and the time it was made. // a request, the peer who it was made to, and the time it was made.
struct Requested { struct Requested {
request: Request, request: Request,
timestamp: SteadyTime, timestamp: SteadyTime,
peer_id: PeerId,
} }
/// Protocol parameters. /// Protocol parameters.
pub struct Params { pub struct Params {
/// Genesis hash.
pub genesis_hash: H256,
/// Network id. /// Network id.
pub network_id: NetworkId, pub network_id: u64,
/// Buffer flow parameters. /// Buffer flow parameters.
pub flow_params: FlowParams, pub flow_params: FlowParams,
/// Initial capabilities. /// Initial capabilities.
@ -175,9 +210,9 @@ pub struct Params {
// Locks must be acquired in the order declared, and when holding a read lock // Locks must be acquired in the order declared, and when holding a read lock
// on the peers, only one peer may be held at a time. // on the peers, only one peer may be held at a time.
pub struct LightProtocol { pub struct LightProtocol {
provider: Box<Provider>, provider: Arc<Provider>,
genesis_hash: H256, genesis_hash: H256,
network_id: NetworkId, network_id: u64,
pending_peers: RwLock<HashMap<PeerId, PendingPeer>>, pending_peers: RwLock<HashMap<PeerId, PendingPeer>>,
peers: RwLock<HashMap<PeerId, Mutex<Peer>>>, peers: RwLock<HashMap<PeerId, Mutex<Peer>>>,
pending_requests: RwLock<HashMap<usize, Requested>>, pending_requests: RwLock<HashMap<usize, Requested>>,
@ -189,10 +224,13 @@ pub struct LightProtocol {
impl LightProtocol { impl LightProtocol {
/// Create a new instance of the protocol manager. /// Create a new instance of the protocol manager.
pub fn new(provider: Box<Provider>, params: Params) -> Self { pub fn new(provider: Arc<Provider>, params: Params) -> Self {
debug!(target: "les", "Initializing LES handler");
let genesis_hash = provider.chain_info().genesis_hash;
LightProtocol { LightProtocol {
provider: provider, provider: provider,
genesis_hash: params.genesis_hash, genesis_hash: genesis_hash,
network_id: params.network_id, network_id: params.network_id,
pending_peers: RwLock::new(HashMap::new()), pending_peers: RwLock::new(HashMap::new()),
peers: RwLock::new(HashMap::new()), peers: RwLock::new(HashMap::new()),
@ -207,28 +245,37 @@ impl LightProtocol {
/// Check the maximum amount of requests of a specific type /// Check the maximum amount of requests of a specific type
/// which a peer would be able to serve. /// which a peer would be able to serve.
pub fn max_requests(&self, peer: PeerId, kind: request::Kind) -> Option<usize> { pub fn max_requests(&self, peer: PeerId, kind: request::Kind) -> Option<usize> {
self.peers.read().get(&peer).map(|peer| { self.peers.read().get(&peer).and_then(|peer| {
let mut peer = peer.lock(); let mut peer = peer.lock();
peer.recharge_remote(); match peer.remote_flow.as_mut() {
peer.remote_flow.max_amount(&peer.remote_buffer, kind) Some(&mut (ref mut buf, ref flow)) => {
flow.recharge(buf);
Some(flow.max_amount(&*buf, kind))
}
None => None,
}
}) })
} }
/// Make a request to a peer. /// Make a request to a peer.
/// ///
/// Fails on: nonexistent peer, network error, /// Fails on: nonexistent peer, network error, peer not server,
/// insufficient buffer. Does not check capabilities before sending. /// insufficient buffer. Does not check capabilities before sending.
/// On success, returns a request id which can later be coordinated /// On success, returns a request id which can later be coordinated
/// with an event. /// with an event.
pub fn request_from(&self, io: &NetworkContext, peer_id: &PeerId, request: Request) -> Result<ReqId, Error> { pub fn request_from(&self, io: &IoContext, peer_id: &PeerId, request: Request) -> Result<ReqId, Error> {
let peers = self.peers.read(); let peers = self.peers.read();
let peer = try!(peers.get(peer_id).ok_or_else(|| Error::UnknownPeer)); let peer = try!(peers.get(peer_id).ok_or_else(|| Error::UnknownPeer));
let mut peer = peer.lock(); let mut peer = peer.lock();
peer.recharge_remote(); match peer.remote_flow.as_mut() {
Some(&mut (ref mut buf, ref flow)) => {
let max = peer.remote_flow.compute_cost(request.kind(), request.amount()); flow.recharge(buf);
try!(peer.remote_buffer.deduct_cost(max)); let max = flow.compute_cost(request.kind(), request.amount());
try!(buf.deduct_cost(max));
}
None => return Err(Error::NotServer),
}
let req_id = self.req_id.fetch_add(1, Ordering::SeqCst); let req_id = self.req_id.fetch_add(1, Ordering::SeqCst);
let packet_data = encode_request(&request, req_id); let packet_data = encode_request(&request, req_id);
@ -242,12 +289,12 @@ impl LightProtocol {
request::Kind::HeaderProofs => packet::GET_HEADER_PROOFS, request::Kind::HeaderProofs => packet::GET_HEADER_PROOFS,
}; };
try!(io.send(*peer_id, packet_id, packet_data)); io.send(*peer_id, packet_id, packet_data);
peer.current_asking.insert(req_id);
self.pending_requests.write().insert(req_id, Requested { self.pending_requests.write().insert(req_id, Requested {
request: request, request: request,
timestamp: SteadyTime::now(), timestamp: SteadyTime::now(),
peer_id: *peer_id,
}); });
Ok(ReqId(req_id)) Ok(ReqId(req_id))
@ -255,8 +302,9 @@ impl LightProtocol {
/// Make an announcement of new chain head and capabilities to all peers. /// Make an announcement of new chain head and capabilities to all peers.
/// The announcement is expected to be valid. /// The announcement is expected to be valid.
pub fn make_announcement(&self, io: &NetworkContext, mut announcement: Announcement) { pub fn make_announcement(&self, io: &IoContext, mut announcement: Announcement) {
let mut reorgs_map = HashMap::new(); let mut reorgs_map = HashMap::new();
let now = SteadyTime::now();
// update stored capabilities // update stored capabilities
self.capabilities.write().update_from(&announcement); self.capabilities.write().update_from(&announcement);
@ -264,6 +312,17 @@ impl LightProtocol {
// calculate reorg info and send packets // calculate reorg info and send packets
for (peer_id, peer_info) in self.peers.read().iter() { for (peer_id, peer_info) in self.peers.read().iter() {
let mut peer_info = peer_info.lock(); let mut peer_info = peer_info.lock();
// TODO: "urgent" announcements like new blocks?
// the timer approach will skip 1 (possibly 2) in rare occasions.
if peer_info.sent_head == announcement.head_hash ||
peer_info.status.head_num >= announcement.head_num ||
now - peer_info.last_update < Duration::milliseconds(UPDATE_INTERVAL_MS) {
continue
}
peer_info.last_update = now;
let reorg_depth = reorgs_map.entry(peer_info.sent_head) let reorg_depth = reorgs_map.entry(peer_info.sent_head)
.or_insert_with(|| { .or_insert_with(|| {
match self.provider.reorg_depth(&announcement.head_hash, &peer_info.sent_head) { match self.provider.reorg_depth(&announcement.head_hash, &peer_info.sent_head) {
@ -281,26 +340,133 @@ impl LightProtocol {
peer_info.sent_head = announcement.head_hash; peer_info.sent_head = announcement.head_hash;
announcement.reorg_depth = *reorg_depth; announcement.reorg_depth = *reorg_depth;
if let Err(e) = io.send(*peer_id, packet::ANNOUNCE, status::write_announcement(&announcement)) { io.send(*peer_id, packet::ANNOUNCE, status::write_announcement(&announcement));
debug!(target: "les", "Error sending to peer {}: {}", peer_id, e);
}
} }
} }
/// Add an event handler. /// Add an event handler.
/// Ownership will be transferred to the protocol structure, /// Ownership will be transferred to the protocol structure,
/// and the handler will be kept alive as long as it is. /// and the handler will be kept alive as long as it is.
/// These are intended to be added at the beginning of the /// These are intended to be added when the protocol structure
/// is initialized as a means of customizing its behavior.
pub fn add_handler(&mut self, handler: Box<Handler>) { pub fn add_handler(&mut self, handler: Box<Handler>) {
self.handlers.push(handler); self.handlers.push(handler);
} }
/// Signal to handlers that network activity is being aborted
/// and clear peer data.
pub fn abort(&self) {
for handler in &self.handlers {
handler.on_abort();
}
// acquire in order and hold.
let mut pending_peers = self.pending_peers.write();
let mut peers = self.peers.write();
let mut pending_requests = self.pending_requests.write();
pending_peers.clear();
peers.clear();
pending_requests.clear();
}
// Does the common pre-verification of responses before the response itself
// is actually decoded:
// - check whether peer exists
// - check whether request was made
// - check whether request kinds match
fn pre_verify_response(&self, peer: &PeerId, kind: request::Kind, raw: &UntrustedRlp) -> Result<ReqId, Error> {
let req_id: usize = try!(raw.val_at(0));
let cur_buffer: U256 = try!(raw.val_at(1));
trace!(target: "les", "pre-verifying response from peer {}, kind={:?}", peer, kind);
match self.pending_requests.write().remove(&req_id) {
None => return Err(Error::UnsolicitedResponse),
Some(requested) => {
if requested.peer_id != *peer || requested.request.kind() != kind {
return Err(Error::UnsolicitedResponse)
}
}
}
let peers = self.peers.read();
match peers.get(peer) {
Some(peer_info) => {
let mut peer_info = peer_info.lock();
match peer_info.remote_flow.as_mut() {
Some(&mut (ref mut buf, ref mut flow)) => {
let actual_buffer = ::std::cmp::min(cur_buffer, *flow.limit());
buf.update_to(actual_buffer)
}
None => return Err(Error::NotServer), // this really should be impossible.
}
Ok(ReqId(req_id))
}
None => Err(Error::UnknownPeer), // probably only occurs in a race of some kind.
}
}
// handle a packet using the given io context.
fn handle_packet(&self, io: &IoContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
let rlp = UntrustedRlp::new(data);
trace!(target: "les", "Incoming packet {} from peer {}", packet_id, peer);
// handle the packet
let res = match packet_id {
packet::STATUS => self.status(peer, io, rlp),
packet::ANNOUNCE => self.announcement(peer, io, rlp),
packet::GET_BLOCK_HEADERS => self.get_block_headers(peer, io, rlp),
packet::BLOCK_HEADERS => self.block_headers(peer, io, rlp),
packet::GET_BLOCK_BODIES => self.get_block_bodies(peer, io, rlp),
packet::BLOCK_BODIES => self.block_bodies(peer, io, rlp),
packet::GET_RECEIPTS => self.get_receipts(peer, io, rlp),
packet::RECEIPTS => self.receipts(peer, io, rlp),
packet::GET_PROOFS => self.get_proofs(peer, io, rlp),
packet::PROOFS => self.proofs(peer, io, rlp),
packet::GET_CONTRACT_CODES => self.get_contract_code(peer, io, rlp),
packet::CONTRACT_CODES => self.contract_code(peer, io, rlp),
packet::GET_HEADER_PROOFS => self.get_header_proofs(peer, io, rlp),
packet::HEADER_PROOFS => self.header_proofs(peer, io, rlp),
packet::SEND_TRANSACTIONS => self.relay_transactions(peer, io, rlp),
other => {
Err(Error::UnrecognizedPacket(other))
}
};
// if something went wrong, figure out how much to punish the peer.
if let Err(e) = res {
match e.punishment() {
Punishment::None => {}
Punishment::Disconnect => {
debug!(target: "les", "Disconnecting peer {}: {}", peer, e);
io.disconnect_peer(*peer)
}
Punishment::Disable => {
debug!(target: "les", "Disabling peer {}: {}", peer, e);
io.disable_peer(*peer)
}
}
}
}
} }
impl LightProtocol { impl LightProtocol {
// called when a peer connects. // called when a peer connects.
fn on_connect(&self, peer: &PeerId, io: &NetworkContext) { fn on_connect(&self, peer: &PeerId, io: &IoContext) {
let peer = *peer; let peer = *peer;
trace!(target: "les", "Peer {} connecting", peer);
match self.send_status(peer, io) { match self.send_status(peer, io) {
Ok(pending_peer) => { Ok(pending_peer) => {
self.pending_peers.write().insert(peer, pending_peer); self.pending_peers.write().insert(peer, pending_peer);
@ -313,44 +479,69 @@ impl LightProtocol {
} }
// called when a peer disconnects. // called when a peer disconnects.
fn on_disconnect(&self, peer: PeerId) { fn on_disconnect(&self, peer: PeerId, io: &IoContext) {
// TODO: reassign all requests assigned to this peer. trace!(target: "les", "Peer {} disconnecting", peer);
self.pending_peers.write().remove(&peer); self.pending_peers.write().remove(&peer);
if self.peers.write().remove(&peer).is_some() { if self.peers.write().remove(&peer).is_some() {
let unfulfilled: Vec<_> = self.pending_requests.read()
.iter()
.filter(|&(_, r)| r.peer_id == peer)
.map(|(&id, _)| ReqId(id))
.collect();
{
let mut pending = self.pending_requests.write();
for &ReqId(ref inner) in &unfulfilled {
pending.remove(inner);
}
}
for handler in &self.handlers { for handler in &self.handlers {
handler.on_disconnect(peer) handler.on_disconnect(&Ctx {
peer: peer,
io: io,
proto: self,
}, &unfulfilled)
} }
} }
} }
// send status to a peer. // send status to a peer.
fn send_status(&self, peer: PeerId, io: &NetworkContext) -> Result<PendingPeer, NetworkError> { fn send_status(&self, peer: PeerId, io: &IoContext) -> Result<PendingPeer, Error> {
let chain_info = self.provider.chain_info(); let proto_version = try!(io.protocol_version(peer).ok_or(Error::WrongNetwork));
// TODO: could update capabilities here. if PROTOCOL_VERSIONS.iter().find(|x| **x == proto_version).is_none() {
return Err(Error::UnsupportedProtocolVersion(proto_version));
}
let chain_info = self.provider.chain_info();
let status = Status { let status = Status {
head_td: chain_info.total_difficulty, head_td: chain_info.total_difficulty,
head_hash: chain_info.best_block_hash, head_hash: chain_info.best_block_hash,
head_num: chain_info.best_block_number, head_num: chain_info.best_block_number,
genesis_hash: chain_info.genesis_hash, genesis_hash: chain_info.genesis_hash,
protocol_version: PROTOCOL_VERSION, protocol_version: proto_version as u32, // match peer proto version
network_id: self.network_id, network_id: self.network_id,
last_head: None, last_head: None,
}; };
let capabilities = self.capabilities.read().clone(); let capabilities = self.capabilities.read().clone();
let status_packet = status::write_handshake(&status, &capabilities, &self.flow_params); let status_packet = status::write_handshake(&status, &capabilities, Some(&self.flow_params));
try!(io.send(peer, packet::STATUS, status_packet)); io.send(peer, packet::STATUS, status_packet);
Ok(PendingPeer { Ok(PendingPeer {
sent_head: chain_info.best_block_hash, sent_head: chain_info.best_block_hash,
last_update: SteadyTime::now(),
proto_version: proto_version,
}) })
} }
// Handle status message from peer. // Handle status message from peer.
fn status(&self, peer: &PeerId, data: UntrustedRlp) -> Result<(), Error> { fn status(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> {
let pending = match self.pending_peers.write().remove(peer) { let pending = match self.pending_peers.write().remove(peer) {
Some(pending) => pending, Some(pending) => pending,
None => { None => {
@ -366,33 +557,45 @@ impl LightProtocol {
return Err(Error::WrongNetwork); return Err(Error::WrongNetwork);
} }
if Some(status.protocol_version as u8) != io.protocol_version(*peer) {
return Err(Error::BadProtocolVersion);
}
let remote_flow = flow_params.map(|params| (params.create_buffer(), params));
self.peers.write().insert(*peer, Mutex::new(Peer { self.peers.write().insert(*peer, Mutex::new(Peer {
local_buffer: self.flow_params.create_buffer(), local_buffer: self.flow_params.create_buffer(),
remote_buffer: flow_params.create_buffer(),
current_asking: HashSet::new(),
status: status.clone(), status: status.clone(),
capabilities: capabilities.clone(), capabilities: capabilities.clone(),
remote_flow: flow_params, remote_flow: remote_flow,
sent_head: pending.sent_head, sent_head: pending.sent_head,
last_update: pending.last_update,
proto_version: pending.proto_version,
})); }));
for handler in &self.handlers { for handler in &self.handlers {
handler.on_connect(*peer, &status, &capabilities) handler.on_connect(&Ctx {
peer: *peer,
io: io,
proto: self,
}, &status, &capabilities)
} }
Ok(()) Ok(())
} }
// Handle an announcement. // Handle an announcement.
fn announcement(&self, peer: &PeerId, data: UntrustedRlp) -> Result<(), Error> { fn announcement(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> {
if !self.peers.read().contains_key(peer) { if !self.peers.read().contains_key(peer) {
debug!(target: "les", "Ignoring announcement from unknown peer"); debug!(target: "les", "Ignoring announcement from unknown peer");
return Ok(()) return Ok(())
} }
let announcement = try!(status::parse_announcement(data)); let announcement = try!(status::parse_announcement(data));
let peers = self.peers.read();
// scope to ensure locks are dropped before moving into handler-space.
{
let peers = self.peers.read();
let peer_info = match peers.get(peer) { let peer_info = match peers.get(peer) {
Some(info) => info, Some(info) => info,
None => return Ok(()), None => return Ok(()),
@ -413,16 +616,21 @@ impl LightProtocol {
// update capabilities. // update capabilities.
peer_info.capabilities.update_from(&announcement); peer_info.capabilities.update_from(&announcement);
}
for handler in &self.handlers { for handler in &self.handlers {
handler.on_announcement(*peer, &announcement); handler.on_announcement(&Ctx {
peer: *peer,
io: io,
proto: self,
}, &announcement);
} }
Ok(()) Ok(())
} }
// Handle a request for block headers. // Handle a request for block headers.
fn get_block_headers(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { fn get_block_headers(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> {
const MAX_HEADERS: usize = 512; const MAX_HEADERS: usize = 512;
let peers = self.peers.read(); let peers = self.peers.read();
@ -467,16 +675,29 @@ impl LightProtocol {
} }
stream.out() stream.out()
}).map_err(Into::into) });
Ok(())
} }
// Receive a response for block headers. // Receive a response for block headers.
fn block_headers(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { fn block_headers(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
unimplemented!() let req_id = try!(self.pre_verify_response(peer, request::Kind::Headers, &raw));
let raw_headers: Vec<_> = raw.iter().skip(2).map(|x| x.as_raw().to_owned()).collect();
for handler in &self.handlers {
handler.on_block_headers(&Ctx {
peer: *peer,
io: io,
proto: self,
}, req_id, &raw_headers);
}
Ok(())
} }
// Handle a request for block bodies. // Handle a request for block bodies.
fn get_block_bodies(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { fn get_block_bodies(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> {
const MAX_BODIES: usize = 256; const MAX_BODIES: usize = 256;
let peers = self.peers.read(); let peers = self.peers.read();
@ -513,16 +734,29 @@ impl LightProtocol {
} }
stream.out() stream.out()
}).map_err(Into::into) });
Ok(())
} }
// Receive a response for block bodies. // Receive a response for block bodies.
fn block_bodies(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { fn block_bodies(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
unimplemented!() let req_id = try!(self.pre_verify_response(peer, request::Kind::Bodies, &raw));
let raw_bodies: Vec<Bytes> = raw.iter().skip(2).map(|x| x.as_raw().to_owned()).collect();
for handler in &self.handlers {
handler.on_block_bodies(&Ctx {
peer: *peer,
io: io,
proto: self,
}, req_id, &raw_bodies);
}
Ok(())
} }
// Handle a request for receipts. // Handle a request for receipts.
fn get_receipts(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { fn get_receipts(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> {
const MAX_RECEIPTS: usize = 256; const MAX_RECEIPTS: usize = 256;
let peers = self.peers.read(); let peers = self.peers.read();
@ -559,16 +793,33 @@ impl LightProtocol {
} }
stream.out() stream.out()
}).map_err(Into::into) });
Ok(())
} }
// Receive a response for receipts. // Receive a response for receipts.
fn receipts(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { fn receipts(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
unimplemented!() let req_id = try!(self.pre_verify_response(peer, request::Kind::Receipts, &raw));
let raw_receipts: Vec<Vec<Receipt>> = try!(raw
.iter()
.skip(2)
.map(|x| x.as_val())
.collect());
for handler in &self.handlers {
handler.on_receipts(&Ctx {
peer: *peer,
io: io,
proto: self,
}, req_id, &raw_receipts);
}
Ok(())
} }
// Handle a request for proofs. // Handle a request for proofs.
fn get_proofs(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { fn get_proofs(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> {
const MAX_PROOFS: usize = 128; const MAX_PROOFS: usize = 128;
let peers = self.peers.read(); let peers = self.peers.read();
@ -616,16 +867,33 @@ impl LightProtocol {
} }
stream.out() stream.out()
}).map_err(Into::into) });
Ok(())
} }
// Receive a response for proofs. // Receive a response for proofs.
fn proofs(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { fn proofs(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
unimplemented!() let req_id = try!(self.pre_verify_response(peer, request::Kind::StateProofs, &raw));
let raw_proofs: Vec<Vec<Bytes>> = raw.iter()
.skip(2)
.map(|x| x.iter().map(|node| node.as_raw().to_owned()).collect())
.collect();
for handler in &self.handlers {
handler.on_state_proofs(&Ctx {
peer: *peer,
io: io,
proto: self,
}, req_id, &raw_proofs);
}
Ok(())
} }
// Handle a request for contract code. // Handle a request for contract code.
fn get_contract_code(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { fn get_contract_code(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> {
const MAX_CODES: usize = 256; const MAX_CODES: usize = 256;
let peers = self.peers.read(); let peers = self.peers.read();
@ -667,20 +935,34 @@ impl LightProtocol {
stream.append(&req_id).append(&cur_buffer); stream.append(&req_id).append(&cur_buffer);
for code in response { for code in response {
stream.append_raw(&code, 1); stream.append(&code);
} }
stream.out() stream.out()
}).map_err(Into::into) });
Ok(())
} }
// Receive a response for contract code. // Receive a response for contract code.
fn contract_code(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { fn contract_code(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
unimplemented!() let req_id = try!(self.pre_verify_response(peer, request::Kind::Codes, &raw));
let raw_code: Vec<Bytes> = try!(raw.iter().skip(2).map(|x| x.as_val()).collect());
for handler in &self.handlers {
handler.on_code(&Ctx {
peer: *peer,
io: io,
proto: self,
}, req_id, &raw_code);
}
Ok(())
} }
// Handle a request for header proofs // Handle a request for header proofs
fn get_header_proofs(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> { fn get_header_proofs(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> {
const MAX_PROOFS: usize = 256; const MAX_PROOFS: usize = 256;
let peers = self.peers.read(); let peers = self.peers.read();
@ -727,16 +1009,37 @@ impl LightProtocol {
} }
stream.out() stream.out()
}).map_err(Into::into) });
Ok(())
} }
// Receive a response for header proofs // Receive a response for header proofs
fn header_proofs(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> { fn header_proofs(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
unimplemented!() fn decode_res(raw: UntrustedRlp) -> Result<(Bytes, Vec<Bytes>), ::rlp::DecoderError> {
Ok((
try!(raw.val_at(0)),
try!(raw.at(1)).iter().map(|x| x.as_raw().to_owned()).collect(),
))
}
let req_id = try!(self.pre_verify_response(peer, request::Kind::HeaderProofs, &raw));
let raw_proofs: Vec<_> = try!(raw.iter().skip(2).map(decode_res).collect());
for handler in &self.handlers {
handler.on_header_proofs(&Ctx {
peer: *peer,
io: io,
proto: self,
}, req_id, &raw_proofs);
}
Ok(())
} }
// Receive a set of transactions to relay. // Receive a set of transactions to relay.
fn relay_transactions(&self, peer: &PeerId, data: UntrustedRlp) -> Result<(), Error> { fn relay_transactions(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> {
const MAX_TRANSACTIONS: usize = 256; const MAX_TRANSACTIONS: usize = 256;
let txs: Vec<_> = try!(data.iter().take(MAX_TRANSACTIONS).map(|x| x.as_val::<SignedTransaction>()).collect()); let txs: Vec<_> = try!(data.iter().take(MAX_TRANSACTIONS).map(|x| x.as_val::<SignedTransaction>()).collect());
@ -744,7 +1047,11 @@ impl LightProtocol {
debug!(target: "les", "Received {} transactions to relay from peer {}", txs.len(), peer); debug!(target: "les", "Received {} transactions to relay from peer {}", txs.len(), peer);
for handler in &self.handlers { for handler in &self.handlers {
handler.on_transactions(*peer, &txs); handler.on_transactions(&Ctx {
peer: *peer,
io: io,
proto: self,
}, &txs);
} }
Ok(()) Ok(())
@ -757,60 +1064,15 @@ impl NetworkProtocolHandler for LightProtocol {
} }
fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
let rlp = UntrustedRlp::new(data); self.handle_packet(io, peer, packet_id, data);
// handle the packet
let res = match packet_id {
packet::STATUS => self.status(peer, rlp),
packet::ANNOUNCE => self.announcement(peer, rlp),
packet::GET_BLOCK_HEADERS => self.get_block_headers(peer, io, rlp),
packet::BLOCK_HEADERS => self.block_headers(peer, io, rlp),
packet::GET_BLOCK_BODIES => self.get_block_bodies(peer, io, rlp),
packet::BLOCK_BODIES => self.block_bodies(peer, io, rlp),
packet::GET_RECEIPTS => self.get_receipts(peer, io, rlp),
packet::RECEIPTS => self.receipts(peer, io, rlp),
packet::GET_PROOFS => self.get_proofs(peer, io, rlp),
packet::PROOFS => self.proofs(peer, io, rlp),
packet::GET_CONTRACT_CODES => self.get_contract_code(peer, io, rlp),
packet::CONTRACT_CODES => self.contract_code(peer, io, rlp),
packet::GET_HEADER_PROOFS => self.get_header_proofs(peer, io, rlp),
packet::HEADER_PROOFS => self.header_proofs(peer, io, rlp),
packet::SEND_TRANSACTIONS => self.relay_transactions(peer, rlp),
other => {
Err(Error::UnrecognizedPacket(other))
}
};
// if something went wrong, figure out how much to punish the peer.
if let Err(e) = res {
match e.punishment() {
Punishment::None => {}
Punishment::Disconnect => {
debug!(target: "les", "Disconnecting peer {}: {}", peer, e);
io.disconnect_peer(*peer)
}
Punishment::Disable => {
debug!(target: "les", "Disabling peer {}: {}", peer, e);
io.disable_peer(*peer)
}
}
}
} }
fn connected(&self, io: &NetworkContext, peer: &PeerId) { fn connected(&self, io: &NetworkContext, peer: &PeerId) {
self.on_connect(peer, io); self.on_connect(peer, io);
} }
fn disconnected(&self, _io: &NetworkContext, peer: &PeerId) { fn disconnected(&self, io: &NetworkContext, peer: &PeerId) {
self.on_disconnect(*peer); self.on_disconnect(*peer, io);
} }
fn timeout(&self, _io: &NetworkContext, timer: TimerToken) { fn timeout(&self, _io: &NetworkContext, timer: TimerToken) {

View File

@ -82,26 +82,6 @@ impl Key {
} }
} }
/// Network ID structure.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u32)]
pub enum NetworkId {
/// ID for the mainnet
Mainnet = 1,
/// ID for the testnet
Testnet = 0,
}
impl NetworkId {
fn from_raw(raw: u32) -> Option<Self> {
match raw {
0 => Some(NetworkId::Testnet),
1 => Some(NetworkId::Mainnet),
_ => None,
}
}
}
// helper for decoding key-value pairs in the handshake or an announcement. // helper for decoding key-value pairs in the handshake or an announcement.
struct Parser<'a> { struct Parser<'a> {
pos: usize, pos: usize,
@ -118,6 +98,7 @@ impl<'a> Parser<'a> {
// expect a specific next key, and get the value's RLP. // expect a specific next key, and get the value's RLP.
// if the key isn't found, the position isn't advanced. // if the key isn't found, the position isn't advanced.
fn expect_raw(&mut self, key: Key) -> Result<UntrustedRlp<'a>, DecoderError> { fn expect_raw(&mut self, key: Key) -> Result<UntrustedRlp<'a>, DecoderError> {
trace!(target: "les", "Expecting key {}", key.as_str());
let pre_pos = self.pos; let pre_pos = self.pos;
if let Some((k, val)) = try!(self.get_next()) { if let Some((k, val)) = try!(self.get_next()) {
if k == key { return Ok(val) } if k == key { return Ok(val) }
@ -164,7 +145,7 @@ pub struct Status {
/// Protocol version. /// Protocol version.
pub protocol_version: u32, pub protocol_version: u32,
/// Network id of this peer. /// Network id of this peer.
pub network_id: NetworkId, pub network_id: u64,
/// Total difficulty of the head of the chain. /// Total difficulty of the head of the chain.
pub head_td: U256, pub head_td: U256,
/// Hash of the best block. /// Hash of the best block.
@ -217,7 +198,7 @@ impl Capabilities {
/// - chain status /// - chain status
/// - serving capabilities /// - serving capabilities
/// - buffer flow parameters /// - buffer flow parameters
pub fn parse_handshake(rlp: UntrustedRlp) -> Result<(Status, Capabilities, FlowParams), DecoderError> { pub fn parse_handshake(rlp: UntrustedRlp) -> Result<(Status, Capabilities, Option<FlowParams>), DecoderError> {
let mut parser = Parser { let mut parser = Parser {
pos: 0, pos: 0,
rlp: rlp, rlp: rlp,
@ -225,8 +206,7 @@ pub fn parse_handshake(rlp: UntrustedRlp) -> Result<(Status, Capabilities, FlowP
let status = Status { let status = Status {
protocol_version: try!(parser.expect(Key::ProtocolVersion)), protocol_version: try!(parser.expect(Key::ProtocolVersion)),
network_id: try!(parser.expect(Key::NetworkId) network_id: try!(parser.expect(Key::NetworkId)),
.and_then(|id: u32| NetworkId::from_raw(id).ok_or(DecoderError::Custom("Invalid network ID")))),
head_td: try!(parser.expect(Key::HeadTD)), head_td: try!(parser.expect(Key::HeadTD)),
head_hash: try!(parser.expect(Key::HeadHash)), head_hash: try!(parser.expect(Key::HeadHash)),
head_num: try!(parser.expect(Key::HeadNum)), head_num: try!(parser.expect(Key::HeadNum)),
@ -241,20 +221,23 @@ pub fn parse_handshake(rlp: UntrustedRlp) -> Result<(Status, Capabilities, FlowP
tx_relay: parser.expect_raw(Key::TxRelay).is_ok(), tx_relay: parser.expect_raw(Key::TxRelay).is_ok(),
}; };
let flow_params = FlowParams::new( let flow_params = match (
try!(parser.expect(Key::BufferLimit)), parser.expect(Key::BufferLimit),
try!(parser.expect(Key::BufferCostTable)), parser.expect(Key::BufferCostTable),
try!(parser.expect(Key::BufferRechargeRate)), parser.expect(Key::BufferRechargeRate)
); ) {
(Ok(bl), Ok(bct), Ok(brr)) => Some(FlowParams::new(bl, bct, brr)),
_ => None,
};
Ok((status, capabilities, flow_params)) Ok((status, capabilities, flow_params))
} }
/// Write a handshake, given status, capabilities, and flow parameters. /// Write a handshake, given status, capabilities, and flow parameters.
pub fn write_handshake(status: &Status, capabilities: &Capabilities, flow_params: &FlowParams) -> Vec<u8> { pub fn write_handshake(status: &Status, capabilities: &Capabilities, flow_params: Option<&FlowParams>) -> Vec<u8> {
let mut pairs = Vec::new(); let mut pairs = Vec::new();
pairs.push(encode_pair(Key::ProtocolVersion, &status.protocol_version)); pairs.push(encode_pair(Key::ProtocolVersion, &status.protocol_version));
pairs.push(encode_pair(Key::NetworkId, &(status.network_id as u32))); pairs.push(encode_pair(Key::NetworkId, &(status.network_id as u64)));
pairs.push(encode_pair(Key::HeadTD, &status.head_td)); pairs.push(encode_pair(Key::HeadTD, &status.head_td));
pairs.push(encode_pair(Key::HeadHash, &status.head_hash)); pairs.push(encode_pair(Key::HeadHash, &status.head_hash));
pairs.push(encode_pair(Key::HeadNum, &status.head_num)); pairs.push(encode_pair(Key::HeadNum, &status.head_num));
@ -273,9 +256,11 @@ pub fn write_handshake(status: &Status, capabilities: &Capabilities, flow_params
pairs.push(encode_flag(Key::TxRelay)); pairs.push(encode_flag(Key::TxRelay));
} }
if let Some(flow_params) = flow_params {
pairs.push(encode_pair(Key::BufferLimit, flow_params.limit())); pairs.push(encode_pair(Key::BufferLimit, flow_params.limit()));
pairs.push(encode_pair(Key::BufferCostTable, flow_params.cost_table())); pairs.push(encode_pair(Key::BufferCostTable, flow_params.cost_table()));
pairs.push(encode_pair(Key::BufferRechargeRate, flow_params.recharge_rate())); pairs.push(encode_pair(Key::BufferRechargeRate, flow_params.recharge_rate()));
}
let mut stream = RlpStream::new_list(pairs.len()); let mut stream = RlpStream::new_list(pairs.len());
@ -385,7 +370,7 @@ mod tests {
fn full_handshake() { fn full_handshake() {
let status = Status { let status = Status {
protocol_version: 1, protocol_version: 1,
network_id: NetworkId::Mainnet, network_id: 1,
head_td: U256::default(), head_td: U256::default(),
head_hash: H256::default(), head_hash: H256::default(),
head_num: 10, head_num: 10,
@ -406,21 +391,21 @@ mod tests {
1000.into(), 1000.into(),
); );
let handshake = write_handshake(&status, &capabilities, &flow_params); let handshake = write_handshake(&status, &capabilities, Some(&flow_params));
let (read_status, read_capabilities, read_flow) let (read_status, read_capabilities, read_flow)
= parse_handshake(UntrustedRlp::new(&handshake)).unwrap(); = parse_handshake(UntrustedRlp::new(&handshake)).unwrap();
assert_eq!(read_status, status); assert_eq!(read_status, status);
assert_eq!(read_capabilities, capabilities); assert_eq!(read_capabilities, capabilities);
assert_eq!(read_flow, flow_params); assert_eq!(read_flow.unwrap(), flow_params);
} }
#[test] #[test]
fn partial_handshake() { fn partial_handshake() {
let status = Status { let status = Status {
protocol_version: 1, protocol_version: 1,
network_id: NetworkId::Mainnet, network_id: 1,
head_td: U256::default(), head_td: U256::default(),
head_hash: H256::default(), head_hash: H256::default(),
head_num: 10, head_num: 10,
@ -441,21 +426,21 @@ mod tests {
1000.into(), 1000.into(),
); );
let handshake = write_handshake(&status, &capabilities, &flow_params); let handshake = write_handshake(&status, &capabilities, Some(&flow_params));
let (read_status, read_capabilities, read_flow) let (read_status, read_capabilities, read_flow)
= parse_handshake(UntrustedRlp::new(&handshake)).unwrap(); = parse_handshake(UntrustedRlp::new(&handshake)).unwrap();
assert_eq!(read_status, status); assert_eq!(read_status, status);
assert_eq!(read_capabilities, capabilities); assert_eq!(read_capabilities, capabilities);
assert_eq!(read_flow, flow_params); assert_eq!(read_flow.unwrap(), flow_params);
} }
#[test] #[test]
fn skip_unknown_keys() { fn skip_unknown_keys() {
let status = Status { let status = Status {
protocol_version: 1, protocol_version: 1,
network_id: NetworkId::Mainnet, network_id: 1,
head_td: U256::default(), head_td: U256::default(),
head_hash: H256::default(), head_hash: H256::default(),
head_num: 10, head_num: 10,
@ -476,7 +461,7 @@ mod tests {
1000.into(), 1000.into(),
); );
let handshake = write_handshake(&status, &capabilities, &flow_params); let handshake = write_handshake(&status, &capabilities, Some(&flow_params));
let interleaved = { let interleaved = {
let handshake = UntrustedRlp::new(&handshake); let handshake = UntrustedRlp::new(&handshake);
let mut stream = RlpStream::new_list(handshake.item_count() * 3); let mut stream = RlpStream::new_list(handshake.item_count() * 3);
@ -498,7 +483,7 @@ mod tests {
assert_eq!(read_status, status); assert_eq!(read_status, status);
assert_eq!(read_capabilities, capabilities); assert_eq!(read_capabilities, capabilities);
assert_eq!(read_flow, flow_params); assert_eq!(read_flow.unwrap(), flow_params);
} }
#[test] #[test]
@ -548,4 +533,33 @@ mod tests {
let out = stream.drain(); let out = stream.drain();
assert!(parse_announcement(UntrustedRlp::new(&out)).is_ok()); assert!(parse_announcement(UntrustedRlp::new(&out)).is_ok());
} }
#[test]
fn optional_flow() {
let status = Status {
protocol_version: 1,
network_id: 1,
head_td: U256::default(),
head_hash: H256::default(),
head_num: 10,
genesis_hash: H256::zero(),
last_head: None,
};
let capabilities = Capabilities {
serve_headers: true,
serve_chain_since: Some(5),
serve_state_since: Some(8),
tx_relay: true,
};
let handshake = write_handshake(&status, &capabilities, None);
let (read_status, read_capabilities, read_flow)
= parse_handshake(UntrustedRlp::new(&handshake)).unwrap();
assert_eq!(read_status, status);
assert_eq!(read_capabilities, capabilities);
assert!(read_flow.is_none());
}
} }

View File

@ -0,0 +1,512 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Tests for the `LightProtocol` implementation.
//! These don't test of the higher level logic on top of
use ethcore::blockchain_info::BlockChainInfo;
use ethcore::client::{BlockChainClient, EachBlockWith, TestBlockChainClient};
use ethcore::ids::BlockID;
use ethcore::transaction::SignedTransaction;
use network::PeerId;
use net::buffer_flow::FlowParams;
use net::context::IoContext;
use net::status::{Capabilities, Status, write_handshake};
use net::{encode_request, LightProtocol, Params, packet};
use provider::Provider;
use request::{self, Request, Headers};
use rlp::*;
use util::{Bytes, H256, U256};
use std::sync::Arc;
// expected result from a call.
#[derive(Debug, PartialEq, Eq)]
enum Expect {
/// Expect to have message sent to peer.
Send(PeerId, u8, Vec<u8>),
/// Expect this response.
Respond(u8, Vec<u8>),
/// Expect a punishment (disconnect/disable)
Punish(PeerId),
/// Expect nothing.
Nothing,
}
impl IoContext for Expect {
fn send(&self, peer: PeerId, packet_id: u8, packet_body: Vec<u8>) {
assert_eq!(self, &Expect::Send(peer, packet_id, packet_body));
}
fn respond(&self, packet_id: u8, packet_body: Vec<u8>) {
assert_eq!(self, &Expect::Respond(packet_id, packet_body));
}
fn disconnect_peer(&self, peer: PeerId) {
assert_eq!(self, &Expect::Punish(peer));
}
fn disable_peer(&self, peer: PeerId) {
assert_eq!(self, &Expect::Punish(peer));
}
fn protocol_version(&self, _peer: PeerId) -> Option<u8> {
Some(super::MAX_PROTOCOL_VERSION)
}
}
// can't implement directly for Arc due to cross-crate orphan rules.
struct TestProvider(Arc<TestProviderInner>);
struct TestProviderInner {
client: TestBlockChainClient,
}
impl Provider for TestProvider {
fn chain_info(&self) -> BlockChainInfo {
self.0.client.chain_info()
}
fn reorg_depth(&self, a: &H256, b: &H256) -> Option<u64> {
self.0.client.tree_route(a, b).map(|route| route.index as u64)
}
fn earliest_state(&self) -> Option<u64> {
None
}
fn block_headers(&self, req: request::Headers) -> Vec<Bytes> {
let best_num = self.0.client.chain_info().best_block_number;
let start_num = req.block_num;
match self.0.client.block_hash(BlockID::Number(req.block_num)) {
Some(hash) if hash == req.block_hash => {}
_=> {
trace!(target: "les_provider", "unknown/non-canonical start block in header request: {:?}", (req.block_num, req.block_hash));
return vec![]
}
}
(0u64..req.max as u64)
.map(|x: u64| x.saturating_mul(req.skip + 1))
.take_while(|x| if req.reverse { x < &start_num } else { best_num - start_num >= *x })
.map(|x| if req.reverse { start_num - x } else { start_num + x })
.map(|x| self.0.client.block_header(BlockID::Number(x)))
.take_while(|x| x.is_some())
.flat_map(|x| x)
.collect()
}
fn block_bodies(&self, req: request::Bodies) -> Vec<Bytes> {
req.block_hashes.into_iter()
.map(|hash| self.0.client.block_body(BlockID::Hash(hash)))
.map(|body| body.unwrap_or_else(|| ::rlp::EMPTY_LIST_RLP.to_vec()))
.collect()
}
fn receipts(&self, req: request::Receipts) -> Vec<Bytes> {
req.block_hashes.into_iter()
.map(|hash| self.0.client.block_receipts(&hash))
.map(|receipts| receipts.unwrap_or_else(|| ::rlp::EMPTY_LIST_RLP.to_vec()))
.collect()
}
fn proofs(&self, req: request::StateProofs) -> Vec<Bytes> {
req.requests.into_iter()
.map(|req| {
match req.key2 {
Some(_) => ::util::sha3::SHA3_NULL_RLP.to_vec(),
None => {
// sort of a leaf node
let mut stream = RlpStream::new_list(2);
stream.append(&req.key1).append_empty_data();
stream.out()
}
}
})
.collect()
}
fn contract_code(&self, req: request::ContractCodes) -> Vec<Bytes> {
req.code_requests.into_iter()
.map(|req| {
req.account_key.iter().chain(req.account_key.iter()).cloned().collect()
})
.collect()
}
fn header_proofs(&self, req: request::HeaderProofs) -> Vec<Bytes> {
req.requests.into_iter().map(|_| ::rlp::EMPTY_LIST_RLP.to_vec()).collect()
}
fn pending_transactions(&self) -> Vec<SignedTransaction> {
self.0.client.pending_transactions()
}
}
fn make_flow_params() -> FlowParams {
FlowParams::new(5_000_000.into(), Default::default(), 100_000.into())
}
fn capabilities() -> Capabilities {
Capabilities {
serve_headers: true,
serve_chain_since: Some(1),
serve_state_since: Some(1),
tx_relay: true,
}
}
// helper for setting up the protocol handler and provider.
fn setup(flow_params: FlowParams, capabilities: Capabilities) -> (Arc<TestProviderInner>, LightProtocol) {
let provider = Arc::new(TestProviderInner {
client: TestBlockChainClient::new(),
});
let proto = LightProtocol::new(Arc::new(TestProvider(provider.clone())), Params {
network_id: 2,
flow_params: flow_params,
capabilities: capabilities,
});
(provider, proto)
}
fn status(chain_info: BlockChainInfo) -> Status {
Status {
protocol_version: 1,
network_id: 2,
head_td: chain_info.total_difficulty,
head_hash: chain_info.best_block_hash,
head_num: chain_info.best_block_number,
genesis_hash: chain_info.genesis_hash,
last_head: None,
}
}
#[test]
fn handshake_expected() {
let flow_params = make_flow_params();
let capabilities = capabilities();
let (provider, proto) = setup(flow_params.clone(), capabilities.clone());
let status = status(provider.client.chain_info());
let packet_body = write_handshake(&status, &capabilities, Some(&flow_params));
proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body));
}
#[test]
#[should_panic]
fn genesis_mismatch() {
let flow_params = make_flow_params();
let capabilities = capabilities();
let (provider, proto) = setup(flow_params.clone(), capabilities.clone());
let mut status = status(provider.client.chain_info());
status.genesis_hash = H256::default();
let packet_body = write_handshake(&status, &capabilities, Some(&flow_params));
proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body));
}
#[test]
fn buffer_overflow() {
let flow_params = make_flow_params();
let capabilities = capabilities();
let (provider, proto) = setup(flow_params.clone(), capabilities.clone());
let status = status(provider.client.chain_info());
{
let packet_body = write_handshake(&status, &capabilities, Some(&flow_params));
proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body));
}
{
let my_status = write_handshake(&status, &capabilities, Some(&flow_params));
proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status);
}
// 1000 requests is far too many for the default flow params.
let request = encode_request(&Request::Headers(Headers {
block_num: 1,
block_hash: provider.client.chain_info().genesis_hash,
max: 1000,
skip: 0,
reverse: false,
}), 111);
proto.handle_packet(&Expect::Punish(1), &1, packet::GET_BLOCK_HEADERS, &request);
}
// test the basic request types -- these just make sure that requests are parsed
// and sent to the provider correctly as well as testing response formatting.
#[test]
fn get_block_headers() {
let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into());
let capabilities = capabilities();
let (provider, proto) = setup(flow_params.clone(), capabilities.clone());
let cur_status = status(provider.client.chain_info());
let my_status = write_handshake(&cur_status, &capabilities, Some(&flow_params));
provider.client.add_blocks(100, EachBlockWith::Nothing);
let cur_status = status(provider.client.chain_info());
{
let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params));
proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body));
proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status);
}
let request = Headers {
block_num: 1,
block_hash: provider.client.block_hash(BlockID::Number(1)).unwrap(),
max: 10,
skip: 0,
reverse: false,
};
let req_id = 111;
let request_body = encode_request(&Request::Headers(request.clone()), req_id);
let response = {
let headers: Vec<_> = (0..10).map(|i| provider.client.block_header(BlockID::Number(i + 1)).unwrap()).collect();
assert_eq!(headers.len(), 10);
let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::Headers, 10);
let mut response_stream = RlpStream::new_list(12);
response_stream.append(&req_id).append(&new_buf);
for header in headers {
response_stream.append_raw(&header, 1);
}
response_stream.out()
};
let expected = Expect::Respond(packet::BLOCK_HEADERS, response);
proto.handle_packet(&expected, &1, packet::GET_BLOCK_HEADERS, &request_body);
}
#[test]
fn get_block_bodies() {
let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into());
let capabilities = capabilities();
let (provider, proto) = setup(flow_params.clone(), capabilities.clone());
let cur_status = status(provider.client.chain_info());
let my_status = write_handshake(&cur_status, &capabilities, Some(&flow_params));
provider.client.add_blocks(100, EachBlockWith::Nothing);
let cur_status = status(provider.client.chain_info());
{
let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params));
proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body));
proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status);
}
let request = request::Bodies {
block_hashes: (0..10).map(|i| provider.client.block_hash(BlockID::Number(i)).unwrap()).collect(),
};
let req_id = 111;
let request_body = encode_request(&Request::Bodies(request.clone()), req_id);
let response = {
let bodies: Vec<_> = (0..10).map(|i| provider.client.block_body(BlockID::Number(i + 1)).unwrap()).collect();
assert_eq!(bodies.len(), 10);
let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::Bodies, 10);
let mut response_stream = RlpStream::new_list(12);
response_stream.append(&req_id).append(&new_buf);
for body in bodies {
response_stream.append_raw(&body, 1);
}
response_stream.out()
};
let expected = Expect::Respond(packet::BLOCK_BODIES, response);
proto.handle_packet(&expected, &1, packet::GET_BLOCK_BODIES, &request_body);
}
#[test]
fn get_block_receipts() {
let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into());
let capabilities = capabilities();
let (provider, proto) = setup(flow_params.clone(), capabilities.clone());
let cur_status = status(provider.client.chain_info());
let my_status = write_handshake(&cur_status, &capabilities, Some(&flow_params));
provider.client.add_blocks(1000, EachBlockWith::Nothing);
let cur_status = status(provider.client.chain_info());
{
let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params));
proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body));
proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status);
}
// find the first 10 block hashes starting with `f` because receipts are only provided
// by the test client in that case.
let block_hashes: Vec<_> = (0..1000).map(|i| provider.client.block_hash(BlockID::Number(i)).unwrap())
.filter(|hash| format!("{}", hash).starts_with("f")).take(10).collect();
let request = request::Receipts {
block_hashes: block_hashes.clone(),
};
let req_id = 111;
let request_body = encode_request(&Request::Receipts(request.clone()), req_id);
let response = {
let receipts: Vec<_> = block_hashes.iter()
.map(|hash| provider.client.block_receipts(hash).unwrap())
.collect();
let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::Receipts, receipts.len());
let mut response_stream = RlpStream::new_list(2 + receipts.len());
response_stream.append(&req_id).append(&new_buf);
for block_receipts in receipts {
response_stream.append_raw(&block_receipts, 1);
}
response_stream.out()
};
let expected = Expect::Respond(packet::RECEIPTS, response);
proto.handle_packet(&expected, &1, packet::GET_RECEIPTS, &request_body);
}
#[test]
fn get_state_proofs() {
let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into());
let capabilities = capabilities();
let (provider, proto) = setup(flow_params.clone(), capabilities.clone());
let cur_status = status(provider.client.chain_info());
{
let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params));
proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body.clone()));
proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &packet_body);
}
let req_id = 112;
let key1 = U256::from(11223344).into();
let key2 = U256::from(99988887).into();
let request = Request::StateProofs (request::StateProofs {
requests: vec![
request::StateProof { block: H256::default(), key1: key1, key2: None, from_level: 0 },
request::StateProof { block: H256::default(), key1: key1, key2: Some(key2), from_level: 0},
]
});
let request_body = encode_request(&request, req_id);
let response = {
let proofs = vec![
{ let mut stream = RlpStream::new_list(2); stream.append(&key1).append_empty_data(); stream.out() },
::util::sha3::SHA3_NULL_RLP.to_vec(),
];
let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::StateProofs, 2);
let mut response_stream = RlpStream::new_list(4);
response_stream.append(&req_id).append(&new_buf);
for proof in proofs {
response_stream.append_raw(&proof, 1);
}
response_stream.out()
};
let expected = Expect::Respond(packet::PROOFS, response);
proto.handle_packet(&expected, &1, packet::GET_PROOFS, &request_body);
}
#[test]
fn get_contract_code() {
let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into());
let capabilities = capabilities();
let (provider, proto) = setup(flow_params.clone(), capabilities.clone());
let cur_status = status(provider.client.chain_info());
{
let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params));
proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body.clone()));
proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &packet_body);
}
let req_id = 112;
let key1 = U256::from(11223344).into();
let key2 = U256::from(99988887).into();
let request = Request::Codes (request::ContractCodes {
code_requests: vec![
request::ContractCode { block_hash: H256::default(), account_key: key1 },
request::ContractCode { block_hash: H256::default(), account_key: key2 },
],
});
let request_body = encode_request(&request, req_id);
let response = {
let codes: Vec<Vec<_>> = vec![
key1.iter().chain(key1.iter()).cloned().collect(),
key2.iter().chain(key2.iter()).cloned().collect(),
];
let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::Codes, 2);
let mut response_stream = RlpStream::new_list(4);
response_stream.append(&req_id).append(&new_buf);
for code in codes {
response_stream.append(&code);
}
response_stream.out()
};
let expected = Expect::Respond(packet::CONTRACT_CODES, response);
proto.handle_packet(&expected, &1, packet::GET_CONTRACT_CODES, &request_body);
}

View File

@ -33,6 +33,7 @@ use request;
/// or empty vector where appropriate. /// or empty vector where appropriate.
/// ///
/// [1]: https://github.com/ethcore/parity/wiki/Light-Ethereum-Subprotocol-(LES) /// [1]: https://github.com/ethcore/parity/wiki/Light-Ethereum-Subprotocol-(LES)
#[cfg_attr(feature = "ipc", ipc(client_ident="LightProviderClient"))]
pub trait Provider: Send + Sync { pub trait Provider: Send + Sync {
/// Provide current blockchain info. /// Provide current blockchain info.
fn chain_info(&self) -> BlockChainInfo; fn chain_info(&self) -> BlockChainInfo;
@ -71,7 +72,10 @@ pub trait Provider: Send + Sync {
/// Each item in the resulting vector is either the raw bytecode or empty. /// Each item in the resulting vector is either the raw bytecode or empty.
fn contract_code(&self, req: request::ContractCodes) -> Vec<Bytes>; fn contract_code(&self, req: request::ContractCodes) -> Vec<Bytes>;
/// Provide header proofs from the Canonical Hash Tries. /// Provide header proofs from the Canonical Hash Tries as well as the headers
/// they correspond to -- each element in the returned vector is a 2-tuple.
/// The first element is a block header and the second a merkle proof of
/// the header in a requested CHT.
fn header_proofs(&self, req: request::HeaderProofs) -> Vec<Bytes>; fn header_proofs(&self, req: request::HeaderProofs) -> Vec<Bytes>;
/// Provide pending transactions. /// Provide pending transactions.
@ -105,8 +109,8 @@ impl<T: ProvingBlockChainClient + ?Sized> Provider for T {
} }
(0u64..req.max as u64) (0u64..req.max as u64)
.map(|x: u64| x.saturating_mul(req.skip)) .map(|x: u64| x.saturating_mul(req.skip + 1))
.take_while(|x| if req.reverse { x < &start_num } else { best_num - start_num < *x }) .take_while(|x| if req.reverse { x < &start_num } else { best_num - start_num >= *x })
.map(|x| if req.reverse { start_num - x } else { start_num + x }) .map(|x| if req.reverse { start_num - x } else { start_num + x })
.map(|x| self.block_header(BlockID::Number(x))) .map(|x| self.block_header(BlockID::Number(x)))
.take_while(|x| x.is_some()) .take_while(|x| x.is_some())

View File

@ -19,7 +19,8 @@
use util::H256; use util::H256;
/// A request for block headers. /// A request for block headers.
#[derive(Debug, Clone, PartialEq, Eq, Binary)] #[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "ipc", derive(Binary))]
pub struct Headers { pub struct Headers {
/// Starting block number /// Starting block number
pub block_num: u64, pub block_num: u64,
@ -35,7 +36,8 @@ pub struct Headers {
} }
/// A request for specific block bodies. /// A request for specific block bodies.
#[derive(Debug, Clone, PartialEq, Eq, Binary)] #[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "ipc", derive(Binary))]
pub struct Bodies { pub struct Bodies {
/// Hashes which bodies are being requested for. /// Hashes which bodies are being requested for.
pub block_hashes: Vec<H256> pub block_hashes: Vec<H256>
@ -45,14 +47,16 @@ pub struct Bodies {
/// ///
/// This request is answered with a list of transaction receipts for each block /// This request is answered with a list of transaction receipts for each block
/// requested. /// requested.
#[derive(Debug, Clone, PartialEq, Eq, Binary)] #[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "ipc", derive(Binary))]
pub struct Receipts { pub struct Receipts {
/// Block hashes to return receipts for. /// Block hashes to return receipts for.
pub block_hashes: Vec<H256>, pub block_hashes: Vec<H256>,
} }
/// A request for a state proof /// A request for a state proof
#[derive(Debug, Clone, PartialEq, Eq, Binary)] #[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "ipc", derive(Binary))]
pub struct StateProof { pub struct StateProof {
/// Block hash to query state from. /// Block hash to query state from.
pub block: H256, pub block: H256,
@ -66,14 +70,16 @@ pub struct StateProof {
} }
/// A request for state proofs. /// A request for state proofs.
#[derive(Debug, Clone, PartialEq, Eq, Binary)] #[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "ipc", derive(Binary))]
pub struct StateProofs { pub struct StateProofs {
/// All the proof requests. /// All the proof requests.
pub requests: Vec<StateProof>, pub requests: Vec<StateProof>,
} }
/// A request for contract code. /// A request for contract code.
#[derive(Debug, Clone, PartialEq, Eq, Binary)] #[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "ipc", derive(Binary))]
pub struct ContractCode { pub struct ContractCode {
/// Block hash /// Block hash
pub block_hash: H256, pub block_hash: H256,
@ -82,14 +88,16 @@ pub struct ContractCode {
} }
/// A request for contract code. /// A request for contract code.
#[derive(Debug, Clone, PartialEq, Eq, Binary)] #[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "ipc", derive(Binary))]
pub struct ContractCodes { pub struct ContractCodes {
/// Block hash and account key (== sha3(address)) pairs to fetch code for. /// Block hash and account key (== sha3(address)) pairs to fetch code for.
pub code_requests: Vec<ContractCode>, pub code_requests: Vec<ContractCode>,
} }
/// A request for a header proof from the Canonical Hash Trie. /// A request for a header proof from the Canonical Hash Trie.
#[derive(Debug, Clone, PartialEq, Eq, Binary)] #[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "ipc", derive(Binary))]
pub struct HeaderProof { pub struct HeaderProof {
/// Number of the CHT. /// Number of the CHT.
pub cht_number: u64, pub cht_number: u64,
@ -100,14 +108,16 @@ pub struct HeaderProof {
} }
/// A request for header proofs from the CHT. /// A request for header proofs from the CHT.
#[derive(Debug, Clone, PartialEq, Eq, Binary)] #[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "ipc", derive(Binary))]
pub struct HeaderProofs { pub struct HeaderProofs {
/// All the proof requests. /// All the proof requests.
pub requests: Vec<HeaderProof>, pub requests: Vec<HeaderProof>,
} }
/// Kinds of requests. /// Kinds of requests.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Binary)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "ipc", derive(Binary))]
pub enum Kind { pub enum Kind {
/// Requesting headers. /// Requesting headers.
Headers, Headers,
@ -124,7 +134,8 @@ pub enum Kind {
} }
/// Encompasses all possible types of requests in a single structure. /// Encompasses all possible types of requests in a single structure.
#[derive(Debug, Clone, PartialEq, Eq, Binary)] #[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "ipc", derive(Binary))]
pub enum Request { pub enum Request {
/// Requesting headers. /// Requesting headers.
Headers(Headers), Headers(Headers),

View File

@ -15,6 +15,11 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Types used in the public (IPC) api which require custom code generation. //! Types used in the public (IPC) api which require custom code generation.
#![cfg_attr(feature = "ipc", allow(dead_code, unused_assignments, unused_variables))] // codegen issues
#![allow(dead_code, unused_assignments, unused_variables)] // codegen issues
#[cfg(feature = "ipc")]
include!(concat!(env!("OUT_DIR"), "/mod.rs.in")); include!(concat!(env!("OUT_DIR"), "/mod.rs.in"));
#[cfg(not(feature = "ipc"))]
include!("mod.rs.in");

View File

@ -52,7 +52,7 @@ use blockchain::{BlockChain, BlockProvider, TreeRoute, ImportRoute};
use client::{ use client::{
BlockID, TransactionID, UncleID, TraceId, ClientConfig, BlockChainClient, BlockID, TransactionID, UncleID, TraceId, ClientConfig, BlockChainClient,
MiningBlockChainClient, TraceFilter, CallAnalytics, BlockImportError, Mode, MiningBlockChainClient, TraceFilter, CallAnalytics, BlockImportError, Mode,
ChainNotify, PruningInfo, ProvingBlockChainClient, ChainNotify, PruningInfo,
}; };
use client::Error as ClientError; use client::Error as ClientError;
use env_info::EnvInfo; use env_info::EnvInfo;
@ -1391,7 +1391,7 @@ impl MayPanic for Client {
} }
} }
impl ProvingBlockChainClient for Client { impl ::client::ProvingBlockChainClient for Client {
fn prove_storage(&self, key1: H256, key2: H256, from_level: u32, id: BlockID) -> Vec<Bytes> { fn prove_storage(&self, key1: H256, key2: H256, from_level: u32, id: BlockID) -> Vec<Bytes> {
self.state_at(id) self.state_at(id)
.and_then(move |state| state.prove_storage(key1, key2, from_level).ok()) .and_then(move |state| state.prove_storage(key1, key2, from_level).ok())

View File

@ -27,7 +27,9 @@ pub use self::config::{Mode, ClientConfig, DatabaseCompactionProfile, BlockChain
pub use self::error::Error; pub use self::error::Error;
pub use self::test_client::{TestBlockChainClient, EachBlockWith}; pub use self::test_client::{TestBlockChainClient, EachBlockWith};
pub use self::chain_notify::ChainNotify; pub use self::chain_notify::ChainNotify;
pub use self::traits::{BlockChainClient, MiningBlockChainClient, ProvingBlockChainClient}; pub use self::traits::{BlockChainClient, MiningBlockChainClient};
pub use self::traits::ProvingBlockChainClient;
pub use types::ids::*; pub use types::ids::*;
pub use types::trace_filter::Filter as TraceFilter; pub use types::trace_filter::Filter as TraceFilter;

View File

@ -92,8 +92,8 @@ pub struct TestBlockChainClient {
pub first_block: RwLock<Option<(H256, u64)>>, pub first_block: RwLock<Option<(H256, u64)>>,
} }
#[derive(Clone)]
/// Used for generating test client blocks. /// Used for generating test client blocks.
#[derive(Clone)]
pub enum EachBlockWith { pub enum EachBlockWith {
/// Plain block. /// Plain block.
Nothing, Nothing,

View File

@ -31,6 +31,7 @@ use transaction::SignedTransaction;
use state_db::StateDB; use state_db::StateDB;
use util::*; use util::*;
use util::trie::recorder::{Recorder, BasicRecorder as TrieRecorder}; use util::trie::recorder::{Recorder, BasicRecorder as TrieRecorder};
mod account; mod account;

View File

@ -32,6 +32,7 @@ warp = true
allow_ips = "all" allow_ips = "all"
snapshot_peers = 0 snapshot_peers = 0
max_pending_peers = 64 max_pending_peers = 64
serve_light = true
reserved_only = false reserved_only = false
reserved_peers = "./path_to_file" reserved_peers = "./path_to_file"

View File

@ -135,6 +135,8 @@ usage! {
flag_reserved_only: bool = false, flag_reserved_only: bool = false,
or |c: &Config| otry!(c.network).reserved_only.clone(), or |c: &Config| otry!(c.network).reserved_only.clone(),
flag_no_ancient_blocks: bool = false, or |_| None, flag_no_ancient_blocks: bool = false, or |_| None,
flag_serve_light: bool = false,
or |c: &Config| otry!(c.network).serve_light.clone(),
// -- API and Console Options // -- API and Console Options
// RPC // RPC
@ -340,6 +342,7 @@ struct Network {
node_key: Option<String>, node_key: Option<String>,
reserved_peers: Option<String>, reserved_peers: Option<String>,
reserved_only: Option<bool>, reserved_only: Option<bool>,
serve_light: Option<bool>,
} }
#[derive(Default, Debug, PartialEq, RustcDecodable)] #[derive(Default, Debug, PartialEq, RustcDecodable)]
@ -552,6 +555,7 @@ mod tests {
flag_reserved_peers: Some("./path_to_file".into()), flag_reserved_peers: Some("./path_to_file".into()),
flag_reserved_only: false, flag_reserved_only: false,
flag_no_ancient_blocks: false, flag_no_ancient_blocks: false,
flag_serve_light: true,
// -- API and Console Options // -- API and Console Options
// RPC // RPC
@ -725,6 +729,7 @@ mod tests {
node_key: None, node_key: None,
reserved_peers: Some("./path/to/reserved_peers".into()), reserved_peers: Some("./path/to/reserved_peers".into()),
reserved_only: Some(true), reserved_only: Some(true),
serve_light: None,
}), }),
rpc: Some(Rpc { rpc: Some(Rpc {
disable: Some(true), disable: Some(true),

View File

@ -97,6 +97,7 @@ Networking Options:
--max-pending-peers NUM Allow up to NUM pending connections. (default: {flag_max_pending_peers}) --max-pending-peers NUM Allow up to NUM pending connections. (default: {flag_max_pending_peers})
--no-ancient-blocks Disable downloading old blocks after snapshot restoration --no-ancient-blocks Disable downloading old blocks after snapshot restoration
or warp sync. (default: {flag_no_ancient_blocks}) or warp sync. (default: {flag_no_ancient_blocks})
--serve-light Experimental: Serve light client peers. (default: {flag_serve_light})
API and Console Options: API and Console Options:
--no-jsonrpc Disable the JSON-RPC API server. (default: {flag_no_jsonrpc}) --no-jsonrpc Disable the JSON-RPC API server. (default: {flag_no_jsonrpc})

View File

@ -279,6 +279,7 @@ impl Configuration {
no_periodic_snapshot: self.args.flag_no_periodic_snapshot, no_periodic_snapshot: self.args.flag_no_periodic_snapshot,
check_seal: !self.args.flag_no_seal_check, check_seal: !self.args.flag_no_seal_check,
download_old_blocks: !self.args.flag_no_ancient_blocks, download_old_blocks: !self.args.flag_no_ancient_blocks,
serve_light: self.args.flag_serve_light,
verifier_settings: verifier_settings, verifier_settings: verifier_settings,
}; };
Cmd::Run(run_cmd) Cmd::Run(run_cmd)
@ -942,6 +943,7 @@ mod tests {
no_periodic_snapshot: false, no_periodic_snapshot: false,
check_seal: true, check_seal: true,
download_old_blocks: true, download_old_blocks: true,
serve_light: false,
verifier_settings: Default::default(), verifier_settings: Default::default(),
})); }));
} }

View File

@ -43,6 +43,7 @@ extern crate serde;
extern crate serde_json; extern crate serde_json;
extern crate rlp; extern crate rlp;
extern crate ethcore_hash_fetch as hash_fetch; extern crate ethcore_hash_fetch as hash_fetch;
extern crate ethcore_light as light;
extern crate ethcore_ipc_hypervisor as hypervisor; extern crate ethcore_ipc_hypervisor as hypervisor;
extern crate ethcore_rpc; extern crate ethcore_rpc;

View File

@ -15,16 +15,22 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::sync::Arc; use std::sync::Arc;
use std::path::Path;
use ethcore::client::BlockChainClient; use ethcore::client::BlockChainClient;
use hypervisor::Hypervisor; use hypervisor::Hypervisor;
use ethsync::{SyncConfig, NetworkConfiguration, NetworkError}; use ethsync::{SyncConfig, NetworkConfiguration, NetworkError, Params};
use ethcore::snapshot::SnapshotService; use ethcore::snapshot::SnapshotService;
use light::Provider;
#[cfg(not(feature="ipc"))] #[cfg(not(feature="ipc"))]
use self::no_ipc_deps::*; use self::no_ipc_deps::*;
#[cfg(not(feature="ipc"))]
use ethcore_logger::Config as LogConfig;
#[cfg(feature="ipc")] #[cfg(feature="ipc")]
use self::ipc_deps::*; use self::ipc_deps::*;
use ethcore_logger::Config as LogConfig;
use std::path::Path;
#[cfg(feature="ipc")] #[cfg(feature="ipc")]
pub mod service_urls { pub mod service_urls {
@ -36,6 +42,8 @@ pub mod service_urls {
pub const SYNC_NOTIFY: &'static str = "parity-sync-notify.ipc"; pub const SYNC_NOTIFY: &'static str = "parity-sync-notify.ipc";
pub const NETWORK_MANAGER: &'static str = "parity-manage-net.ipc"; pub const NETWORK_MANAGER: &'static str = "parity-manage-net.ipc";
pub const SYNC_CONTROL: &'static str = "parity-sync-control.ipc"; pub const SYNC_CONTROL: &'static str = "parity-sync-control.ipc";
pub const LIGHT_PROVIDER: &'static str = "parity-light-provider.ipc";
#[cfg(feature="stratum")] #[cfg(feature="stratum")]
pub const STRATUM: &'static str = "parity-stratum.ipc"; pub const STRATUM: &'static str = "parity-stratum.ipc";
#[cfg(feature="stratum")] #[cfg(feature="stratum")]
@ -75,6 +83,7 @@ mod ipc_deps {
pub use nanoipc::{GuardedSocket, NanoSocket, generic_client, fast_client}; pub use nanoipc::{GuardedSocket, NanoSocket, generic_client, fast_client};
pub use ipc::IpcSocket; pub use ipc::IpcSocket;
pub use ipc::binary::serialize; pub use ipc::binary::serialize;
pub use light::remote::LightProviderClient;
} }
#[cfg(feature="ipc")] #[cfg(feature="ipc")]
@ -124,6 +133,7 @@ pub fn sync
net_cfg: NetworkConfiguration, net_cfg: NetworkConfiguration,
_client: Arc<BlockChainClient>, _client: Arc<BlockChainClient>,
_snapshot_service: Arc<SnapshotService>, _snapshot_service: Arc<SnapshotService>,
_provider: Arc<Provider>,
log_settings: &LogConfig, log_settings: &LogConfig,
) )
-> Result<SyncModules, NetworkError> -> Result<SyncModules, NetworkError>
@ -141,6 +151,8 @@ pub fn sync
&service_urls::with_base(&hypervisor.io_path, service_urls::SYNC_NOTIFY)).unwrap(); &service_urls::with_base(&hypervisor.io_path, service_urls::SYNC_NOTIFY)).unwrap();
let manage_client = generic_client::<NetworkManagerClient<_>>( let manage_client = generic_client::<NetworkManagerClient<_>>(
&service_urls::with_base(&hypervisor.io_path, service_urls::NETWORK_MANAGER)).unwrap(); &service_urls::with_base(&hypervisor.io_path, service_urls::NETWORK_MANAGER)).unwrap();
let provider_client = generic_client::<LightProviderClient<_>>(
&service_urls::with_base(&hypervisor.io_path, service_urls::LIGHT_PROVIDER)).unwrap();
*hypervisor_ref = Some(hypervisor); *hypervisor_ref = Some(hypervisor);
Ok((sync_client, manage_client, notify_client)) Ok((sync_client, manage_client, notify_client))
@ -154,10 +166,18 @@ pub fn sync
net_cfg: NetworkConfiguration, net_cfg: NetworkConfiguration,
client: Arc<BlockChainClient>, client: Arc<BlockChainClient>,
snapshot_service: Arc<SnapshotService>, snapshot_service: Arc<SnapshotService>,
provider: Arc<Provider>,
_log_settings: &LogConfig, _log_settings: &LogConfig,
) )
-> Result<SyncModules, NetworkError> -> Result<SyncModules, NetworkError>
{ {
let eth_sync = try!(EthSync::new(sync_cfg, client, snapshot_service, net_cfg)); let eth_sync = try!(EthSync::new(Params {
config: sync_cfg,
chain: client,
provider: provider,
snapshot_service: snapshot_service,
network_config: net_cfg,
}));
Ok((eth_sync.clone() as Arc<SyncProvider>, eth_sync.clone() as Arc<ManageNetwork>, eth_sync.clone() as Arc<ChainNotify>)) Ok((eth_sync.clone() as Arc<SyncProvider>, eth_sync.clone() as Arc<ManageNetwork>, eth_sync.clone() as Arc<ChainNotify>))
} }

View File

@ -93,6 +93,7 @@ pub struct RunCmd {
pub no_periodic_snapshot: bool, pub no_periodic_snapshot: bool,
pub check_seal: bool, pub check_seal: bool,
pub download_old_blocks: bool, pub download_old_blocks: bool,
pub serve_light: bool,
pub verifier_settings: VerifierSettings, pub verifier_settings: VerifierSettings,
} }
@ -187,6 +188,11 @@ pub fn execute(cmd: RunCmd, logger: Arc<RotatingLogger>) -> Result<(), String> {
); );
info!("Operating mode: {}", Colour::White.bold().paint(format!("{}", mode))); info!("Operating mode: {}", Colour::White.bold().paint(format!("{}", mode)));
if cmd.serve_light {
info!("Configured to serve light client peers. Please note this feature is {}.",
Colour::White.bold().paint("experimental".to_string()));
}
// display warning about using experimental journaldb alorithm // display warning about using experimental journaldb alorithm
if !algorithm.is_stable() { if !algorithm.is_stable() {
warn!("Your chosen strategy is {}! You can re-run with --pruning to change.", Colour::Red.bold().paint("unstable")); warn!("Your chosen strategy is {}! You can re-run with --pruning to change.", Colour::Red.bold().paint("unstable"));
@ -206,6 +212,7 @@ pub fn execute(cmd: RunCmd, logger: Arc<RotatingLogger>) -> Result<(), String> {
sync_config.fork_block = spec.fork_block(); sync_config.fork_block = spec.fork_block();
sync_config.warp_sync = cmd.warp_sync; sync_config.warp_sync = cmd.warp_sync;
sync_config.download_old_blocks = cmd.download_old_blocks; sync_config.download_old_blocks = cmd.download_old_blocks;
sync_config.serve_light = cmd.serve_light;
let passwords = try!(passwords_from_files(&cmd.acc_conf.password_files)); let passwords = try!(passwords_from_files(&cmd.acc_conf.password_files));
@ -283,7 +290,13 @@ pub fn execute(cmd: RunCmd, logger: Arc<RotatingLogger>) -> Result<(), String> {
// create sync object // create sync object
let (sync_provider, manage_network, chain_notify) = try!(modules::sync( let (sync_provider, manage_network, chain_notify) = try!(modules::sync(
&mut hypervisor, sync_config, net_conf.into(), client.clone(), snapshot_service.clone(), &cmd.logger_config, &mut hypervisor,
sync_config,
net_conf.into(),
client.clone(),
snapshot_service.clone(),
client.clone(),
&cmd.logger_config,
).map_err(|e| format!("Sync error: {}", e))); ).map_err(|e| format!("Sync error: {}", e)));
service.add_notify(chain_notify.clone()); service.add_notify(chain_notify.clone());

View File

@ -22,6 +22,7 @@ use hypervisor::{SYNC_MODULE_ID, HYPERVISOR_IPC_URL, ControlService};
use ethcore::client::ChainNotify; use ethcore::client::ChainNotify;
use ethcore::client::remote::RemoteClient; use ethcore::client::remote::RemoteClient;
use ethcore::snapshot::remote::RemoteSnapshotService; use ethcore::snapshot::remote::RemoteSnapshotService;
use light::remote::LightProviderClient;
use ethsync::{SyncProvider, EthSync, ManageNetwork, ServiceConfiguration}; use ethsync::{SyncProvider, EthSync, ManageNetwork, ServiceConfiguration};
use modules::service_urls; use modules::service_urls;
use boot; use boot;
@ -48,8 +49,15 @@ pub fn main() {
let remote_client = dependency!(RemoteClient, &service_urls::with_base(&service_config.io_path, service_urls::CLIENT)); let remote_client = dependency!(RemoteClient, &service_urls::with_base(&service_config.io_path, service_urls::CLIENT));
let remote_snapshot = dependency!(RemoteSnapshotService, &service_urls::with_base(&service_config.io_path, service_urls::SNAPSHOT)); let remote_snapshot = dependency!(RemoteSnapshotService, &service_urls::with_base(&service_config.io_path, service_urls::SNAPSHOT));
let remote_provider = dependency!(LightProviderClient, &service_urls::with_base(&service_config.io_path, service_urls::LIGHT_PROVIDER));
let sync = EthSync::new(service_config.sync, remote_client.service().clone(), remote_snapshot.service().clone(), service_config.net).unwrap(); let sync = EthSync::new(Params {
config: service_config.sync,
chain: remote_client.service().clone(),
snapshot_service: remote_snapshot.service().clone(),
provider: remote_provider.service().clone(),
network_config: service_config.net
}).unwrap();
let _ = boot::main_thread(); let _ = boot::main_thread();
let service_stop = Arc::new(AtomicBool::new(false)); let service_stop = Arc::new(AtomicBool::new(false));

View File

@ -17,4 +17,5 @@ export TARGETS="
-p ethcore-ipc \ -p ethcore-ipc \
-p ethcore-ipc-tests \ -p ethcore-ipc-tests \
-p ethcore-ipc-nano \ -p ethcore-ipc-nano \
-p ethcore-light \
-p parity" -p parity"

View File

@ -15,6 +15,7 @@ ethcore-ipc-codegen = { path = "../ipc/codegen" }
ethcore-util = { path = "../util" } ethcore-util = { path = "../util" }
ethcore-network = { path = "../util/network" } ethcore-network = { path = "../util/network" }
ethcore-io = { path = "../util/io" } ethcore-io = { path = "../util/io" }
ethcore-light = { path = "../ethcore/light"}
ethcore = { path = "../ethcore" } ethcore = { path = "../ethcore" }
rlp = { path = "../util/rlp" } rlp = { path = "../util/rlp" }
clippy = { version = "0.0.103", optional = true} clippy = { version = "0.0.103", optional = true}
@ -31,6 +32,6 @@ ethkey = { path = "../ethkey" }
parking_lot = "0.3" parking_lot = "0.3"
[features] [features]
default = ["ipc"] default = []
dev = ["clippy", "ethcore/dev", "ethcore-util/dev"] dev = ["clippy", "ethcore/dev", "ethcore-util/dev"]
ipc = [] ipc = ["ethcore-light/ipc"]

View File

@ -16,6 +16,10 @@
extern crate ethcore_ipc_codegen; extern crate ethcore_ipc_codegen;
#[cfg(feature = "ipc")]
fn main() { fn main() {
ethcore_ipc_codegen::derive_ipc_cond("src/api.rs", cfg!(feature="ipc")).unwrap(); ethcore_ipc_codegen::derive_ipc_cond("src/api.rs", true).unwrap();
} }
#[cfg(not(feature = "ipc"))]
fn main() {}

View File

@ -33,6 +33,7 @@ use ipc::{BinaryConvertable, BinaryConvertError, IpcConfig};
use std::str::FromStr; use std::str::FromStr;
use parking_lot::RwLock; use parking_lot::RwLock;
use chain::{ETH_PACKET_COUNT, SNAPSHOT_SYNC_PACKET_COUNT}; use chain::{ETH_PACKET_COUNT, SNAPSHOT_SYNC_PACKET_COUNT};
use light::net::{LightProtocol, Params as LightParams, Capabilities, Handler as LightHandler, EventContext};
pub const WARP_SYNC_PROTOCOL_ID: ProtocolId = *b"par"; pub const WARP_SYNC_PROTOCOL_ID: ProtocolId = *b"par";
@ -47,10 +48,14 @@ pub struct SyncConfig {
pub network_id: u64, pub network_id: u64,
/// Main "eth" subprotocol name. /// Main "eth" subprotocol name.
pub subprotocol_name: [u8; 3], pub subprotocol_name: [u8; 3],
/// Light "les" subprotocol name.
pub light_subprotocol_name: [u8; 3],
/// Fork block to check /// Fork block to check
pub fork_block: Option<(BlockNumber, H256)>, pub fork_block: Option<(BlockNumber, H256)>,
/// Enable snapshot sync /// Enable snapshot sync
pub warp_sync: bool, pub warp_sync: bool,
/// Enable light client server.
pub serve_light: bool,
} }
impl Default for SyncConfig { impl Default for SyncConfig {
@ -60,8 +65,10 @@ impl Default for SyncConfig {
download_old_blocks: true, download_old_blocks: true,
network_id: 1, network_id: 1,
subprotocol_name: *b"eth", subprotocol_name: *b"eth",
light_subprotocol_name: *b"les",
fork_block: None, fork_block: None,
warp_sync: false, warp_sync: false,
serve_light: false,
} }
} }
} }
@ -85,14 +92,18 @@ pub trait SyncProvider: Send + Sync {
} }
/// Transaction stats /// Transaction stats
#[derive(Debug, Binary)] #[derive(Debug)]
#[cfg_attr(feature = "ipc", derive(Binary))]
pub struct TransactionStats { pub struct TransactionStats {
/// Block number where this TX was first seen.
pub first_seen: u64, pub first_seen: u64,
/// Peers it was propagated to.
pub propagated_to: BTreeMap<H512, usize>, pub propagated_to: BTreeMap<H512, usize>,
} }
/// Peer connection information /// Peer connection information
#[derive(Debug, Binary)] #[derive(Debug)]
#[cfg_attr(feature = "ipc", derive(Binary))]
pub struct PeerInfo { pub struct PeerInfo {
/// Public node id /// Public node id
pub id: Option<String>, pub id: Option<String>,
@ -112,51 +123,93 @@ pub struct PeerInfo {
pub eth_difficulty: Option<U256>, pub eth_difficulty: Option<U256>,
} }
/// EthSync initialization parameters.
#[cfg_attr(feature = "ipc", derive(Binary))]
pub struct Params {
/// Configuration.
pub config: SyncConfig,
/// Blockchain client.
pub chain: Arc<BlockChainClient>,
/// Snapshot service.
pub snapshot_service: Arc<SnapshotService>,
/// Light data provider.
pub provider: Arc<::light::Provider>,
/// Network layer configuration.
pub network_config: NetworkConfiguration,
}
/// Ethereum network protocol handler /// Ethereum network protocol handler
pub struct EthSync { pub struct EthSync {
/// Network service /// Network service
network: NetworkService, network: NetworkService,
/// Protocol handler /// Main (eth/par) protocol handler
handler: Arc<SyncProtocolHandler>, sync_handler: Arc<SyncProtocolHandler>,
/// Light (les) protocol handler
light_proto: Option<Arc<LightProtocol>>,
/// The main subprotocol name /// The main subprotocol name
subprotocol_name: [u8; 3], subprotocol_name: [u8; 3],
/// Configuration /// Light subprotocol name.
config: NetworkConfiguration, light_subprotocol_name: [u8; 3],
} }
impl EthSync { impl EthSync {
/// Creates and register protocol with the network service /// Creates and register protocol with the network service
pub fn new(config: SyncConfig, chain: Arc<BlockChainClient>, snapshot_service: Arc<SnapshotService>, network_config: NetworkConfiguration) -> Result<Arc<EthSync>, NetworkError> { pub fn new(params: Params) -> Result<Arc<EthSync>, NetworkError> {
let chain_sync = ChainSync::new(config, &*chain); let pruning_info = params.chain.pruning_info();
let service = try!(NetworkService::new(try!(network_config.clone().into_basic()))); let light_proto = match params.config.serve_light {
let sync = Arc::new(EthSync{ false => None,
true => Some({
let light_params = LightParams {
network_id: params.config.network_id,
flow_params: Default::default(),
capabilities: Capabilities {
serve_headers: true,
serve_chain_since: Some(pruning_info.earliest_chain),
serve_state_since: Some(pruning_info.earliest_state),
tx_relay: true,
},
};
let mut light_proto = LightProtocol::new(params.provider, light_params);
light_proto.add_handler(Box::new(TxRelay(params.chain.clone())));
Arc::new(light_proto)
})
};
let chain_sync = ChainSync::new(params.config, &*params.chain);
let service = try!(NetworkService::new(try!(params.network_config.clone().into_basic())));
let sync = Arc::new(EthSync {
network: service, network: service,
handler: Arc::new(SyncProtocolHandler { sync_handler: Arc::new(SyncProtocolHandler {
sync: RwLock::new(chain_sync), sync: RwLock::new(chain_sync),
chain: chain, chain: params.chain,
snapshot_service: snapshot_service, snapshot_service: params.snapshot_service,
overlay: RwLock::new(HashMap::new()), overlay: RwLock::new(HashMap::new()),
}), }),
subprotocol_name: config.subprotocol_name, light_proto: light_proto,
config: network_config, subprotocol_name: params.config.subprotocol_name,
light_subprotocol_name: params.config.light_subprotocol_name,
}); });
Ok(sync) Ok(sync)
} }
} }
#[ipc(client_ident="SyncClient")] #[cfg_attr(feature = "ipc", ipc(client_ident="SyncClient"))]
impl SyncProvider for EthSync { impl SyncProvider for EthSync {
/// Get sync status /// Get sync status
fn status(&self) -> SyncStatus { fn status(&self) -> SyncStatus {
self.handler.sync.write().status() self.sync_handler.sync.write().status()
} }
/// Get sync peers /// Get sync peers
fn peers(&self) -> Vec<PeerInfo> { fn peers(&self) -> Vec<PeerInfo> {
// TODO: [rob] LES peers/peer info
self.network.with_context_eval(self.subprotocol_name, |context| { self.network.with_context_eval(self.subprotocol_name, |context| {
let sync_io = NetSyncIo::new(context, &*self.handler.chain, &*self.handler.snapshot_service, &self.handler.overlay); let sync_io = NetSyncIo::new(context, &*self.sync_handler.chain, &*self.sync_handler.snapshot_service, &self.sync_handler.overlay);
self.handler.sync.write().peers(&sync_io) self.sync_handler.sync.write().peers(&sync_io)
}).unwrap_or(Vec::new()) }).unwrap_or(Vec::new())
} }
@ -165,7 +218,7 @@ impl SyncProvider for EthSync {
} }
fn transactions_stats(&self) -> BTreeMap<H256, TransactionStats> { fn transactions_stats(&self) -> BTreeMap<H256, TransactionStats> {
let sync = self.handler.sync.read(); let sync = self.sync_handler.sync.read();
sync.transactions_stats() sync.transactions_stats()
.iter() .iter()
.map(|(hash, stats)| (*hash, stats.into())) .map(|(hash, stats)| (*hash, stats.into()))
@ -226,9 +279,11 @@ impl ChainNotify for EthSync {
sealed: Vec<H256>, sealed: Vec<H256>,
_duration: u64) _duration: u64)
{ {
use light::net::Announcement;
self.network.with_context(self.subprotocol_name, |context| { self.network.with_context(self.subprotocol_name, |context| {
let mut sync_io = NetSyncIo::new(context, &*self.handler.chain, &*self.handler.snapshot_service, &self.handler.overlay); let mut sync_io = NetSyncIo::new(context, &*self.sync_handler.chain, &*self.sync_handler.snapshot_service, &self.sync_handler.overlay);
self.handler.sync.write().chain_new_blocks( self.sync_handler.sync.write().chain_new_blocks(
&mut sync_io, &mut sync_io,
&imported, &imported,
&invalid, &invalid,
@ -236,6 +291,25 @@ impl ChainNotify for EthSync {
&retracted, &retracted,
&sealed); &sealed);
}); });
self.network.with_context(self.light_subprotocol_name, |context| {
let light_proto = match self.light_proto.as_ref() {
Some(lp) => lp,
None => return,
};
let chain_info = self.sync_handler.chain.chain_info();
light_proto.make_announcement(context, Announcement {
head_hash: chain_info.best_block_hash,
head_num: chain_info.best_block_number,
head_td: chain_info.total_difficulty,
reorg_depth: 0, // recalculated on a per-peer basis.
serve_headers: false, // these fields consist of _changes_ in capability.
serve_state_since: None,
serve_chain_since: None,
tx_relay: false,
})
})
} }
fn start(&self) { fn start(&self) {
@ -244,19 +318,36 @@ impl ChainNotify for EthSync {
Err(err) => warn!("Error starting network: {}", err), Err(err) => warn!("Error starting network: {}", err),
_ => {}, _ => {},
} }
self.network.register_protocol(self.handler.clone(), self.subprotocol_name, ETH_PACKET_COUNT, &[62u8, 63u8]) self.network.register_protocol(self.sync_handler.clone(), self.subprotocol_name, ETH_PACKET_COUNT, &[62u8, 63u8])
.unwrap_or_else(|e| warn!("Error registering ethereum protocol: {:?}", e)); .unwrap_or_else(|e| warn!("Error registering ethereum protocol: {:?}", e));
// register the warp sync subprotocol // register the warp sync subprotocol
self.network.register_protocol(self.handler.clone(), WARP_SYNC_PROTOCOL_ID, SNAPSHOT_SYNC_PACKET_COUNT, &[1u8]) self.network.register_protocol(self.sync_handler.clone(), WARP_SYNC_PROTOCOL_ID, SNAPSHOT_SYNC_PACKET_COUNT, &[1u8])
.unwrap_or_else(|e| warn!("Error registering snapshot sync protocol: {:?}", e)); .unwrap_or_else(|e| warn!("Error registering snapshot sync protocol: {:?}", e));
// register the light protocol.
if let Some(light_proto) = self.light_proto.as_ref().map(|x| x.clone()) {
self.network.register_protocol(light_proto, self.light_subprotocol_name, ::light::net::PACKET_COUNT, ::light::net::PROTOCOL_VERSIONS)
.unwrap_or_else(|e| warn!("Error registering light client protocol: {:?}", e));
}
} }
fn stop(&self) { fn stop(&self) {
self.handler.snapshot_service.abort_restore(); self.sync_handler.snapshot_service.abort_restore();
self.network.stop().unwrap_or_else(|e| warn!("Error stopping network: {:?}", e)); self.network.stop().unwrap_or_else(|e| warn!("Error stopping network: {:?}", e));
} }
} }
/// LES event handler.
/// Simply queues transactions from light client peers.
struct TxRelay(Arc<BlockChainClient>);
impl LightHandler for TxRelay {
fn on_transactions(&self, ctx: &EventContext, relay: &[::ethcore::transaction::SignedTransaction]) {
trace!(target: "les", "Relaying {} transactions from peer {}", relay.len(), ctx.peer());
self.0.queue_transactions(relay.iter().map(|tx| ::rlp::encode(tx).to_vec()).collect())
}
}
impl IpcConfig for ManageNetwork { } impl IpcConfig for ManageNetwork { }
impl IpcConfig for SyncProvider { } impl IpcConfig for SyncProvider { }
@ -279,7 +370,7 @@ pub trait ManageNetwork : Send + Sync {
} }
#[ipc(client_ident="NetworkManagerClient")] #[cfg_attr(feature = "ipc", ipc(client_ident="NetworkManagerClient"))]
impl ManageNetwork for EthSync { impl ManageNetwork for EthSync {
fn accept_unreserved_peers(&self) { fn accept_unreserved_peers(&self) {
self.network.set_non_reserved_mode(NonReservedPeerMode::Accept); self.network.set_non_reserved_mode(NonReservedPeerMode::Accept);
@ -303,9 +394,14 @@ impl ManageNetwork for EthSync {
fn stop_network(&self) { fn stop_network(&self) {
self.network.with_context(self.subprotocol_name, |context| { self.network.with_context(self.subprotocol_name, |context| {
let mut sync_io = NetSyncIo::new(context, &*self.handler.chain, &*self.handler.snapshot_service, &self.handler.overlay); let mut sync_io = NetSyncIo::new(context, &*self.sync_handler.chain, &*self.sync_handler.snapshot_service, &self.sync_handler.overlay);
self.handler.sync.write().abort(&mut sync_io); self.sync_handler.sync.write().abort(&mut sync_io);
}); });
if let Some(light_proto) = self.light_proto.as_ref() {
light_proto.abort();
}
self.stop(); self.stop();
} }
@ -315,7 +411,8 @@ impl ManageNetwork for EthSync {
} }
/// IP fiter /// IP fiter
#[derive(Binary, Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "ipc", derive(Binary))]
pub enum AllowIP { pub enum AllowIP {
/// Connect to any address /// Connect to any address
All, All,
@ -337,7 +434,8 @@ impl AllowIP {
} }
} }
#[derive(Binary, Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "ipc", derive(Binary))]
/// Network service configuration /// Network service configuration
pub struct NetworkConfiguration { pub struct NetworkConfiguration {
/// Directory path to store general network configuration. None means nothing will be saved /// Directory path to store general network configuration. None means nothing will be saved
@ -375,26 +473,18 @@ pub struct NetworkConfiguration {
} }
impl NetworkConfiguration { impl NetworkConfiguration {
/// Create a new default config.
pub fn new() -> Self { pub fn new() -> Self {
From::from(BasicNetworkConfiguration::new()) From::from(BasicNetworkConfiguration::new())
} }
/// Create a new local config.
pub fn new_local() -> Self { pub fn new_local() -> Self {
From::from(BasicNetworkConfiguration::new_local()) From::from(BasicNetworkConfiguration::new_local())
} }
fn validate(&self) -> Result<(), AddrParseError> { /// Attempt to convert this config into a BasicNetworkConfiguration.
if let Some(ref addr) = self.listen_address {
try!(SocketAddr::from_str(&addr));
}
if let Some(ref addr) = self.public_address {
try!(SocketAddr::from_str(&addr));
}
Ok(())
}
pub fn into_basic(self) -> Result<BasicNetworkConfiguration, AddrParseError> { pub fn into_basic(self) -> Result<BasicNetworkConfiguration, AddrParseError> {
Ok(BasicNetworkConfiguration { Ok(BasicNetworkConfiguration {
config_path: self.config_path, config_path: self.config_path,
net_config_path: self.net_config_path, net_config_path: self.net_config_path,
@ -447,9 +537,14 @@ impl From<BasicNetworkConfiguration> for NetworkConfiguration {
} }
} }
#[derive(Debug, Binary, Clone)] /// Configuration for IPC service.
#[derive(Debug, Clone)]
#[cfg_attr(feature = "ipc", derive(Binary))]
pub struct ServiceConfiguration { pub struct ServiceConfiguration {
/// Sync config.
pub sync: SyncConfig, pub sync: SyncConfig,
/// Network configuration.
pub net: NetworkConfiguration, pub net: NetworkConfiguration,
/// IPC path.
pub io_path: String, pub io_path: String,
} }

View File

@ -37,6 +37,8 @@ extern crate semver;
extern crate parking_lot; extern crate parking_lot;
extern crate rlp; extern crate rlp;
extern crate ethcore_light as light;
#[cfg(test)] extern crate ethcore_devtools as devtools; #[cfg(test)] extern crate ethcore_devtools as devtools;
#[cfg(test)] extern crate ethkey; #[cfg(test)] extern crate ethkey;
@ -59,12 +61,16 @@ mod transactions_stats;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
#[cfg(feature = "ipc")]
mod api { mod api {
#![allow(dead_code, unused_assignments, unused_variables, missing_docs)] // codegen issues #![allow(dead_code, unused_assignments, unused_variables, missing_docs)] // codegen issues
include!(concat!(env!("OUT_DIR"), "/api.rs")); include!(concat!(env!("OUT_DIR"), "/api.rs"));
} }
pub use api::{EthSync, SyncProvider, SyncClient, NetworkManagerClient, ManageNetwork, SyncConfig, #[cfg(not(feature = "ipc"))]
mod api;
pub use api::{EthSync, Params, SyncProvider, ManageNetwork, SyncConfig,
ServiceConfiguration, NetworkConfiguration, PeerInfo, AllowIP, TransactionStats}; ServiceConfiguration, NetworkConfiguration, PeerInfo, AllowIP, TransactionStats};
pub use chain::{SyncStatus, SyncState}; pub use chain::{SyncStatus, SyncState};
pub use network::{is_valid_node_url, NonReservedPeerMode, NetworkError}; pub use network::{is_valid_node_url, NonReservedPeerMode, NetworkError};

View File

@ -18,4 +18,6 @@ pub mod helpers;
pub mod snapshot; pub mod snapshot;
mod chain; mod chain;
mod consensus; mod consensus;
#[cfg(feature = "ipc")]
mod rpc; mod rpc;