diff --git a/ethcore/light/src/client.rs b/ethcore/light/src/client.rs
index 0035406dc..fcfff81e6 100644
--- a/ethcore/light/src/client.rs
+++ b/ethcore/light/src/client.rs
@@ -14,8 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see .
-//! Light client implementation. Used for raw data queries as well as the header
-//! sync.
+//! Light client implementation. Stores data from light sync
use std::sync::Arc;
@@ -29,7 +28,7 @@ use ethcore::transaction::SignedTransaction;
use ethcore::blockchain_info::BlockChainInfo;
use io::IoChannel;
-use util::hash::H256;
+use util::hash::{H256, H256FastMap};
use util::{Bytes, Mutex};
use provider::Provider;
@@ -37,9 +36,10 @@ use request;
/// Light client implementation.
pub struct Client {
- engine: Arc,
+ _engine: Arc,
header_queue: HeaderQueue,
- message_channel: Mutex>,
+ _message_channel: Mutex>,
+ tx_pool: Mutex>,
}
impl Client {
@@ -55,12 +55,17 @@ impl Client {
false
}
+ /// Import a local transaction.
+ pub fn import_own_transaction(&self, tx: SignedTransaction) {
+ self.tx_pool.lock().insert(tx.hash(), tx);
+ }
+
/// Fetch a vector of all pending transactions.
pub fn pending_transactions(&self) -> Vec {
- vec![]
+ self.tx_pool.lock().values().cloned().collect()
}
- /// Inquire about the status of a given block.
+ /// Inquire about the status of a given block (or header).
pub fn status(&self, _id: BlockId) -> BlockStatus {
BlockStatus::Unknown
}
diff --git a/ethcore/light/src/lib.rs b/ethcore/light/src/lib.rs
index 7fa2f5911..f9008ac7c 100644
--- a/ethcore/light/src/lib.rs
+++ b/ethcore/light/src/lib.rs
@@ -28,8 +28,7 @@
//! It starts by performing a header-only sync, verifying random samples
//! of members of the chain to varying degrees.
-// TODO: remove when integrating with the rest of parity.
-#![allow(dead_code)]
+#![deny(missing_docs)]
pub mod client;
pub mod net;
diff --git a/ethcore/light/src/net/context.rs b/ethcore/light/src/net/context.rs
index c05e69b0f..96c217895 100644
--- a/ethcore/light/src/net/context.rs
+++ b/ethcore/light/src/net/context.rs
@@ -26,95 +26,95 @@ use request::Request;
/// disconnecting peers. This is used as a generalization of the portions
/// of a p2p network which the light protocol structure makes use of.
pub trait IoContext {
- /// Send a packet to a specific peer.
- fn send(&self, peer: PeerId, packet_id: u8, packet_body: Vec);
+ /// Send a packet to a specific peer.
+ fn send(&self, peer: PeerId, packet_id: u8, packet_body: Vec);
- /// Respond to a peer's message. Only works if this context is a byproduct
- /// of a packet handler.
- fn respond(&self, packet_id: u8, packet_body: Vec);
+ /// Respond to a peer's message. Only works if this context is a byproduct
+ /// of a packet handler.
+ fn respond(&self, packet_id: u8, packet_body: Vec);
- /// Disconnect a peer.
- fn disconnect_peer(&self, peer: PeerId);
+ /// Disconnect a peer.
+ fn disconnect_peer(&self, peer: PeerId);
- /// Disable a peer -- this is a disconnect + a time-out.
- fn disable_peer(&self, peer: PeerId);
+ /// Disable a peer -- this is a disconnect + a time-out.
+ fn disable_peer(&self, peer: PeerId);
- /// Get a peer's protocol version.
- fn protocol_version(&self, peer: PeerId) -> Option;
+ /// Get a peer's protocol version.
+ fn protocol_version(&self, peer: PeerId) -> Option;
}
impl<'a> IoContext for NetworkContext<'a> {
- fn send(&self, peer: PeerId, packet_id: u8, packet_body: Vec) {
- if let Err(e) = self.send(peer, packet_id, packet_body) {
- debug!(target: "les", "Error sending packet to peer {}: {}", peer, e);
- }
- }
+ fn send(&self, peer: PeerId, packet_id: u8, packet_body: Vec) {
+ if let Err(e) = self.send(peer, packet_id, packet_body) {
+ debug!(target: "les", "Error sending packet to peer {}: {}", peer, e);
+ }
+ }
- fn respond(&self, packet_id: u8, packet_body: Vec) {
- if let Err(e) = self.respond(packet_id, packet_body) {
- debug!(target: "les", "Error responding to peer message: {}", e);
- }
- }
+ fn respond(&self, packet_id: u8, packet_body: Vec) {
+ if let Err(e) = self.respond(packet_id, packet_body) {
+ debug!(target: "les", "Error responding to peer message: {}", e);
+ }
+ }
- fn disconnect_peer(&self, peer: PeerId) {
- NetworkContext::disconnect_peer(self, peer);
- }
+ fn disconnect_peer(&self, peer: PeerId) {
+ NetworkContext::disconnect_peer(self, peer);
+ }
- fn disable_peer(&self, peer: PeerId) {
- NetworkContext::disable_peer(self, peer);
- }
+ fn disable_peer(&self, peer: PeerId) {
+ NetworkContext::disable_peer(self, peer);
+ }
- fn protocol_version(&self, peer: PeerId) -> Option {
- self.protocol_version(self.subprotocol_name(), peer)
- }
+ fn protocol_version(&self, peer: PeerId) -> Option {
+ self.protocol_version(self.subprotocol_name(), peer)
+ }
}
/// Context for a protocol event.
pub trait EventContext {
- /// Get the peer relevant to the event e.g. message sender,
- /// disconnected/connected peer.
- fn peer(&self) -> PeerId;
+ /// Get the peer relevant to the event e.g. message sender,
+ /// disconnected/connected peer.
+ fn peer(&self) -> PeerId;
- /// Make a request from a peer.
- fn request_from(&self, peer: PeerId, request: Request) -> Result;
+ /// Make a request from a peer.
+ fn request_from(&self, peer: PeerId, request: Request) -> Result;
- /// Make an announcement of new capabilities to the rest of the peers.
- // TODO: maybe just put this on a timer in LightProtocol?
- fn make_announcement(&self, announcement: Announcement);
+ /// Make an announcement of new capabilities to the rest of the peers.
+ // TODO: maybe just put this on a timer in LightProtocol?
+ fn make_announcement(&self, announcement: Announcement);
- /// Disconnect a peer.
- fn disconnect_peer(&self, peer: PeerId);
+ /// Disconnect a peer.
+ fn disconnect_peer(&self, peer: PeerId);
- /// Disable a peer.
- fn disable_peer(&self, peer: PeerId);
+ /// Disable a peer.
+ fn disable_peer(&self, peer: PeerId);
}
/// Concrete implementation of `EventContext` over the light protocol struct and
/// an io context.
pub struct Ctx<'a> {
- /// Io context to enable immediate response to events.
- pub io: &'a IoContext,
- /// Protocol implementation.
- pub proto: &'a LightProtocol,
- /// Relevant peer for event.
- pub peer: PeerId,
+ /// Io context to enable immediate response to events.
+ pub io: &'a IoContext,
+ /// Protocol implementation.
+ pub proto: &'a LightProtocol,
+ /// Relevant peer for event.
+ pub peer: PeerId,
}
impl<'a> EventContext for Ctx<'a> {
- fn peer(&self) -> PeerId { self.peer }
- fn request_from(&self, peer: PeerId, request: Request) -> Result {
- self.proto.request_from(self.io, &peer, request)
- }
+ fn peer(&self) -> PeerId { self.peer }
+ fn request_from(&self, peer: PeerId, request: Request) -> Result {
+ self.proto.request_from(self.io, &peer, request)
+ }
- fn make_announcement(&self, announcement: Announcement) {
- self.proto.make_announcement(self.io, announcement);
- }
+ fn make_announcement(&self, announcement: Announcement) {
+ self.proto.make_announcement(self.io, announcement);
+ }
- fn disconnect_peer(&self, peer: PeerId) {
- self.io.disconnect_peer(peer);
- }
+ fn disconnect_peer(&self, peer: PeerId) {
+ self.io.disconnect_peer(peer);
+ }
- fn disable_peer(&self, peer: PeerId) {
- self.io.disable_peer(peer);
- }
+ fn disable_peer(&self, peer: PeerId) {
+ self.io.disable_peer(peer);
+ }
}
\ No newline at end of file
diff --git a/ethcore/light/src/net/mod.rs b/ethcore/light/src/net/mod.rs
index 481740a48..e5bf0cb2b 100644
--- a/ethcore/light/src/net/mod.rs
+++ b/ethcore/light/src/net/mod.rs
@@ -34,7 +34,7 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use provider::Provider;
-use request::{self, Request};
+use request::{self, HashOrNumber, Request};
use self::buffer_flow::{Buffer, FlowParams};
use self::context::Ctx;
@@ -57,13 +57,13 @@ const TIMEOUT_INTERVAL_MS: u64 = 1000;
// minimum interval between updates.
const UPDATE_INTERVAL_MS: i64 = 5000;
-// Supported protocol versions.
+/// Supported protocol versions.
pub const PROTOCOL_VERSIONS: &'static [u8] = &[1];
-// Max protocol version.
+/// Max protocol version.
pub const MAX_PROTOCOL_VERSION: u8 = 1;
-// Packet count for LES.
+/// Packet count for LES.
pub const PACKET_COUNT: u8 = 15;
// packet ID definitions.
@@ -102,6 +102,18 @@ mod packet {
pub const HEADER_PROOFS: u8 = 0x0e;
}
+// timeouts for different kinds of requests. all values are in milliseconds.
+// TODO: variable timeouts based on request count.
+mod timeout {
+ pub const HANDSHAKE: i64 = 2500;
+ pub const HEADERS: i64 = 5000;
+ pub const BODIES: i64 = 5000;
+ pub const RECEIPTS: i64 = 3500;
+ pub const PROOFS: i64 = 4000;
+ pub const CONTRACT_CODES: i64 = 5000;
+ pub const HEADER_PROOFS: i64 = 3500;
+}
+
/// A request id.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ReqId(usize);
@@ -111,7 +123,6 @@ pub struct ReqId(usize);
struct PendingPeer {
sent_head: H256,
last_update: SteadyTime,
- proto_version: u8,
}
// data about each peer.
@@ -122,7 +133,6 @@ struct Peer {
remote_flow: Option<(Buffer, FlowParams)>,
sent_head: H256, // last head we've given them.
last_update: SteadyTime,
- proto_version: u8,
}
impl Peer {
@@ -443,17 +453,54 @@ impl LightProtocol {
}
};
- // if something went wrong, figure out how much to punish the peer.
if let Err(e) = res {
- match e.punishment() {
- Punishment::None => {}
- Punishment::Disconnect => {
- debug!(target: "les", "Disconnecting peer {}: {}", peer, e);
- io.disconnect_peer(*peer)
- }
- Punishment::Disable => {
- debug!(target: "les", "Disabling peer {}: {}", peer, e);
- io.disable_peer(*peer)
+ punish(*peer, io, e);
+ }
+ }
+
+ // check timeouts and punish peers.
+ fn timeout_check(&self, io: &IoContext) {
+ let now = SteadyTime::now();
+
+ // handshake timeout
+ {
+ let mut pending = self.pending_peers.write();
+ let slowpokes: Vec<_> = pending.iter()
+ .filter(|&(_, ref peer)| {
+ peer.last_update + Duration::milliseconds(timeout::HANDSHAKE) <= now
+ })
+ .map(|(&p, _)| p)
+ .collect();
+
+ for slowpoke in slowpokes {
+ debug!(target: "les", "Peer {} handshake timed out", slowpoke);
+ pending.remove(&slowpoke);
+ io.disconnect_peer(slowpoke);
+ }
+ }
+
+ // request timeouts
+ {
+ for r in self.pending_requests.read().values() {
+ let kind_timeout = match r.request.kind() {
+ request::Kind::Headers => timeout::HEADERS,
+ request::Kind::Bodies => timeout::BODIES,
+ request::Kind::Receipts => timeout::RECEIPTS,
+ request::Kind::StateProofs => timeout::PROOFS,
+ request::Kind::Codes => timeout::CONTRACT_CODES,
+ request::Kind::HeaderProofs => timeout::HEADER_PROOFS,
+ };
+
+ if r.timestamp + Duration::milliseconds(kind_timeout) <= now {
+ debug!(target: "les", "Request for {:?} from peer {} timed out",
+ r.request.kind(), r.peer_id);
+
+ // keep the request in the `pending` set for now so
+ // on_disconnect will pass unfulfilled ReqIds to handlers.
+ // in the case that a response is received after this, the
+ // disconnect won't be cancelled but the ReqId won't be
+ // marked as abandoned.
+ io.disconnect_peer(r.peer_id);
}
}
}
@@ -463,19 +510,37 @@ impl LightProtocol {
impl LightProtocol {
// called when a peer connects.
fn on_connect(&self, peer: &PeerId, io: &IoContext) {
- let peer = *peer;
+ let proto_version = match io.protocol_version(*peer).ok_or(Error::WrongNetwork) {
+ Ok(pv) => pv,
+ Err(e) => { punish(*peer, io, e); return }
+ };
- trace!(target: "les", "Peer {} connecting", peer);
-
- match self.send_status(peer, io) {
- Ok(pending_peer) => {
- self.pending_peers.write().insert(peer, pending_peer);
- }
- Err(e) => {
- trace!(target: "les", "Error while sending status: {}", e);
- io.disconnect_peer(peer);
- }
+ if PROTOCOL_VERSIONS.iter().find(|x| **x == proto_version).is_none() {
+ punish(*peer, io, Error::UnsupportedProtocolVersion(proto_version));
+ return;
}
+
+ let chain_info = self.provider.chain_info();
+
+ let status = Status {
+ head_td: chain_info.total_difficulty,
+ head_hash: chain_info.best_block_hash,
+ head_num: chain_info.best_block_number,
+ genesis_hash: chain_info.genesis_hash,
+ protocol_version: proto_version as u32, // match peer proto version
+ network_id: self.network_id,
+ last_head: None,
+ };
+
+ let capabilities = self.capabilities.read().clone();
+ let status_packet = status::write_handshake(&status, &capabilities, Some(&self.flow_params));
+
+ self.pending_peers.write().insert(*peer, PendingPeer {
+ sent_head: chain_info.best_block_hash,
+ last_update: SteadyTime::now(),
+ });
+
+ io.send(*peer, packet::STATUS, status_packet);
}
// called when a peer disconnects.
@@ -508,38 +573,6 @@ impl LightProtocol {
}
}
- // send status to a peer.
- fn send_status(&self, peer: PeerId, io: &IoContext) -> Result {
- let proto_version = try!(io.protocol_version(peer).ok_or(Error::WrongNetwork));
-
- if PROTOCOL_VERSIONS.iter().find(|x| **x == proto_version).is_none() {
- return Err(Error::UnsupportedProtocolVersion(proto_version));
- }
-
- let chain_info = self.provider.chain_info();
-
- let status = Status {
- head_td: chain_info.total_difficulty,
- head_hash: chain_info.best_block_hash,
- head_num: chain_info.best_block_number,
- genesis_hash: chain_info.genesis_hash,
- protocol_version: proto_version as u32, // match peer proto version
- network_id: self.network_id,
- last_head: None,
- };
-
- let capabilities = self.capabilities.read().clone();
- let status_packet = status::write_handshake(&status, &capabilities, Some(&self.flow_params));
-
- io.send(peer, packet::STATUS, status_packet);
-
- Ok(PendingPeer {
- sent_head: chain_info.best_block_hash,
- last_update: SteadyTime::now(),
- proto_version: proto_version,
- })
- }
-
// Handle status message from peer.
fn status(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> {
let pending = match self.pending_peers.write().remove(peer) {
@@ -570,7 +603,6 @@ impl LightProtocol {
remote_flow: remote_flow,
sent_head: pending.sent_head,
last_update: pending.last_update,
- proto_version: pending.proto_version,
}));
for handler in &self.handlers {
@@ -630,7 +662,7 @@ impl LightProtocol {
}
// Handle a request for block headers.
- fn get_block_headers(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> {
+ fn get_block_headers(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> {
const MAX_HEADERS: usize = 512;
let peers = self.peers.read();
@@ -645,18 +677,21 @@ impl LightProtocol {
let mut peer = peer.lock();
let req_id: u64 = try!(data.val_at(0));
+ let data = try!(data.at(1));
- let block = {
- let rlp = try!(data.at(1));
- (try!(rlp.val_at(0)), try!(rlp.val_at(1)))
+ let start_block = {
+ if try!(data.at(0)).size() == 32 {
+ HashOrNumber::Hash(try!(data.val_at(0)))
+ } else {
+ HashOrNumber::Number(try!(data.val_at(0)))
+ }
};
let req = request::Headers {
- block_num: block.0,
- block_hash: block.1,
- max: ::std::cmp::min(MAX_HEADERS, try!(data.val_at(2))),
- skip: try!(data.val_at(3)),
- reverse: try!(data.val_at(4)),
+ start: start_block,
+ max: ::std::cmp::min(MAX_HEADERS, try!(data.val_at(1))),
+ skip: try!(data.val_at(2)),
+ reverse: try!(data.val_at(3)),
};
let max_cost = try!(peer.deduct_max(&self.flow_params, request::Kind::Headers, req.max));
@@ -667,8 +702,8 @@ impl LightProtocol {
let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost);
io.respond(packet::BLOCK_HEADERS, {
- let mut stream = RlpStream::new_list(response.len() + 2);
- stream.append(&req_id).append(&cur_buffer);
+ let mut stream = RlpStream::new_list(3);
+ stream.append(&req_id).append(&cur_buffer).begin_list(response.len());
for header in response {
stream.append_raw(&header, 1);
@@ -683,7 +718,7 @@ impl LightProtocol {
// Receive a response for block headers.
fn block_headers(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
let req_id = try!(self.pre_verify_response(peer, request::Kind::Headers, &raw));
- let raw_headers: Vec<_> = raw.iter().skip(2).map(|x| x.as_raw().to_owned()).collect();
+ let raw_headers: Vec<_> = try!(raw.at(2)).iter().map(|x| x.as_raw().to_owned()).collect();
for handler in &self.handlers {
handler.on_block_headers(&Ctx {
@@ -713,7 +748,7 @@ impl LightProtocol {
let req_id: u64 = try!(data.val_at(0));
let req = request::Bodies {
- block_hashes: try!(data.iter().skip(1).take(MAX_BODIES).map(|x| x.as_val()).collect())
+ block_hashes: try!(try!(data.at(1)).iter().take(MAX_BODIES).map(|x| x.as_val()).collect())
};
let max_cost = try!(peer.deduct_max(&self.flow_params, request::Kind::Bodies, req.block_hashes.len()));
@@ -726,8 +761,8 @@ impl LightProtocol {
let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost);
io.respond(packet::BLOCK_BODIES, {
- let mut stream = RlpStream::new_list(response.len() + 2);
- stream.append(&req_id).append(&cur_buffer);
+ let mut stream = RlpStream::new_list(3);
+ stream.append(&req_id).append(&cur_buffer).begin_list(response.len());
for body in response {
stream.append_raw(&body, 1);
@@ -742,7 +777,7 @@ impl LightProtocol {
// Receive a response for block bodies.
fn block_bodies(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
let req_id = try!(self.pre_verify_response(peer, request::Kind::Bodies, &raw));
- let raw_bodies: Vec = raw.iter().skip(2).map(|x| x.as_raw().to_owned()).collect();
+ let raw_bodies: Vec = try!(raw.at(2)).iter().map(|x| x.as_raw().to_owned()).collect();
for handler in &self.handlers {
handler.on_block_bodies(&Ctx {
@@ -772,7 +807,7 @@ impl LightProtocol {
let req_id: u64 = try!(data.val_at(0));
let req = request::Receipts {
- block_hashes: try!(data.iter().skip(1).take(MAX_RECEIPTS).map(|x| x.as_val()).collect())
+ block_hashes: try!(try!(data.at(1)).iter().take(MAX_RECEIPTS).map(|x| x.as_val()).collect())
};
let max_cost = try!(peer.deduct_max(&self.flow_params, request::Kind::Receipts, req.block_hashes.len()));
@@ -785,8 +820,8 @@ impl LightProtocol {
let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost);
io.respond(packet::RECEIPTS, {
- let mut stream = RlpStream::new_list(response.len() + 2);
- stream.append(&req_id).append(&cur_buffer);
+ let mut stream = RlpStream::new_list(3);
+ stream.append(&req_id).append(&cur_buffer).begin_list(response.len());
for receipts in response {
stream.append_raw(&receipts, 1);
@@ -801,9 +836,8 @@ impl LightProtocol {
// Receive a response for receipts.
fn receipts(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
let req_id = try!(self.pre_verify_response(peer, request::Kind::Receipts, &raw));
- let raw_receipts: Vec> = try!(raw
+ let raw_receipts: Vec> = try!(try!(raw.at(2))
.iter()
- .skip(2)
.map(|x| x.as_val())
.collect());
@@ -835,7 +869,7 @@ impl LightProtocol {
let req_id: u64 = try!(data.val_at(0));
let req = {
- let requests: Result, Error> = data.iter().skip(1).take(MAX_PROOFS).map(|x| {
+ let requests: Result, Error> = try!(data.at(1)).iter().take(MAX_PROOFS).map(|x| {
Ok(request::StateProof {
block: try!(x.val_at(0)),
key1: try!(x.val_at(1)),
@@ -859,8 +893,8 @@ impl LightProtocol {
let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost);
io.respond(packet::PROOFS, {
- let mut stream = RlpStream::new_list(response.len() + 2);
- stream.append(&req_id).append(&cur_buffer);
+ let mut stream = RlpStream::new_list(3);
+ stream.append(&req_id).append(&cur_buffer).begin_list(response.len());
for proof in response {
stream.append_raw(&proof, 1);
@@ -876,8 +910,7 @@ impl LightProtocol {
fn proofs(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
let req_id = try!(self.pre_verify_response(peer, request::Kind::StateProofs, &raw));
- let raw_proofs: Vec> = raw.iter()
- .skip(2)
+ let raw_proofs: Vec> = try!(raw.at(2)).iter()
.map(|x| x.iter().map(|node| node.as_raw().to_owned()).collect())
.collect();
@@ -909,7 +942,7 @@ impl LightProtocol {
let req_id: u64 = try!(data.val_at(0));
let req = {
- let requests: Result, Error> = data.iter().skip(1).take(MAX_CODES).map(|x| {
+ let requests: Result, Error> = try!(data.at(1)).iter().take(MAX_CODES).map(|x| {
Ok(request::ContractCode {
block_hash: try!(x.val_at(0)),
account_key: try!(x.val_at(1)),
@@ -931,8 +964,8 @@ impl LightProtocol {
let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost);
io.respond(packet::CONTRACT_CODES, {
- let mut stream = RlpStream::new_list(response.len() + 2);
- stream.append(&req_id).append(&cur_buffer);
+ let mut stream = RlpStream::new_list(3);
+ stream.append(&req_id).append(&cur_buffer).begin_list(response.len());
for code in response {
stream.append(&code);
@@ -948,7 +981,7 @@ impl LightProtocol {
fn contract_code(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
let req_id = try!(self.pre_verify_response(peer, request::Kind::Codes, &raw));
- let raw_code: Vec = try!(raw.iter().skip(2).map(|x| x.as_val()).collect());
+ let raw_code: Vec = try!(try!(raw.at(2)).iter().map(|x| x.as_val()).collect());
for handler in &self.handlers {
handler.on_code(&Ctx {
@@ -978,7 +1011,7 @@ impl LightProtocol {
let req_id: u64 = try!(data.val_at(0));
let req = {
- let requests: Result, Error> = data.iter().skip(1).take(MAX_PROOFS).map(|x| {
+ let requests: Result, Error> = try!(data.at(1)).iter().take(MAX_PROOFS).map(|x| {
Ok(request::HeaderProof {
cht_number: try!(x.val_at(0)),
block_number: try!(x.val_at(1)),
@@ -1001,8 +1034,8 @@ impl LightProtocol {
let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost);
io.respond(packet::HEADER_PROOFS, {
- let mut stream = RlpStream::new_list(response.len() + 2);
- stream.append(&req_id).append(&cur_buffer);
+ let mut stream = RlpStream::new_list(3);
+ stream.append(&req_id).append(&cur_buffer).begin_list(response.len());
for proof in response {
stream.append_raw(&proof, 1);
@@ -1023,9 +1056,8 @@ impl LightProtocol {
))
}
-
let req_id = try!(self.pre_verify_response(peer, request::Kind::HeaderProofs, &raw));
- let raw_proofs: Vec<_> = try!(raw.iter().skip(2).map(decode_res).collect());
+ let raw_proofs: Vec<_> = try!(try!(raw.at(2)).iter().map(decode_res).collect());
for handler in &self.handlers {
handler.on_header_proofs(&Ctx {
@@ -1058,6 +1090,21 @@ impl LightProtocol {
}
}
+// if something went wrong, figure out how much to punish the peer.
+fn punish(peer: PeerId, io: &IoContext, e: Error) {
+ match e.punishment() {
+ Punishment::None => {}
+ Punishment::Disconnect => {
+ debug!(target: "les", "Disconnecting peer {}: {}", peer, e);
+ io.disconnect_peer(peer)
+ }
+ Punishment::Disable => {
+ debug!(target: "les", "Disabling peer {}: {}", peer, e);
+ io.disable_peer(peer)
+ }
+ }
+}
+
impl NetworkProtocolHandler for LightProtocol {
fn initialize(&self, io: &NetworkContext) {
io.register_timer(TIMEOUT, TIMEOUT_INTERVAL_MS).expect("Error registering sync timer.");
@@ -1075,11 +1122,9 @@ impl NetworkProtocolHandler for LightProtocol {
self.on_disconnect(*peer, io);
}
- fn timeout(&self, _io: &NetworkContext, timer: TimerToken) {
+ fn timeout(&self, io: &NetworkContext, timer: TimerToken) {
match timer {
- TIMEOUT => {
- // broadcast transactions to peers.
- }
+ TIMEOUT => self.timeout_check(io),
_ => warn!(target: "les", "received timeout on unknown token {}", timer),
}
}
@@ -1089,20 +1134,24 @@ impl NetworkProtocolHandler for LightProtocol {
fn encode_request(req: &Request, req_id: usize) -> Vec {
match *req {
Request::Headers(ref headers) => {
- let mut stream = RlpStream::new_list(5);
+ let mut stream = RlpStream::new_list(2);
+ stream.append(&req_id).begin_list(4);
+
+ match headers.start {
+ HashOrNumber::Hash(ref hash) => stream.append(hash),
+ HashOrNumber::Number(ref num) => stream.append(num),
+ };
+
stream
- .append(&req_id)
- .begin_list(2)
- .append(&headers.block_num)
- .append(&headers.block_hash)
.append(&headers.max)
.append(&headers.skip)
.append(&headers.reverse);
+
stream.out()
}
Request::Bodies(ref request) => {
- let mut stream = RlpStream::new_list(request.block_hashes.len() + 1);
- stream.append(&req_id);
+ let mut stream = RlpStream::new_list(2);
+ stream.append(&req_id).begin_list(request.block_hashes.len());
for hash in &request.block_hashes {
stream.append(hash);
@@ -1111,8 +1160,8 @@ fn encode_request(req: &Request, req_id: usize) -> Vec {
stream.out()
}
Request::Receipts(ref request) => {
- let mut stream = RlpStream::new_list(request.block_hashes.len() + 1);
- stream.append(&req_id);
+ let mut stream = RlpStream::new_list(2);
+ stream.append(&req_id).begin_list(request.block_hashes.len());
for hash in &request.block_hashes {
stream.append(hash);
@@ -1121,8 +1170,8 @@ fn encode_request(req: &Request, req_id: usize) -> Vec {
stream.out()
}
Request::StateProofs(ref request) => {
- let mut stream = RlpStream::new_list(request.requests.len() + 1);
- stream.append(&req_id);
+ let mut stream = RlpStream::new_list(2);
+ stream.append(&req_id).begin_list(request.requests.len());
for proof_req in &request.requests {
stream.begin_list(4)
@@ -1140,8 +1189,8 @@ fn encode_request(req: &Request, req_id: usize) -> Vec {
stream.out()
}
Request::Codes(ref request) => {
- let mut stream = RlpStream::new_list(request.code_requests.len() + 1);
- stream.append(&req_id);
+ let mut stream = RlpStream::new_list(2);
+ stream.append(&req_id).begin_list(request.code_requests.len());
for code_req in &request.code_requests {
stream.begin_list(2)
@@ -1152,8 +1201,8 @@ fn encode_request(req: &Request, req_id: usize) -> Vec {
stream.out()
}
Request::HeaderProofs(ref request) => {
- let mut stream = RlpStream::new_list(request.requests.len() + 1);
- stream.append(&req_id);
+ let mut stream = RlpStream::new_list(2);
+ stream.append(&req_id).begin_list(request.requests.len());
for proof_req in &request.requests {
stream.begin_list(3)
diff --git a/ethcore/light/src/net/tests/mod.rs b/ethcore/light/src/net/tests/mod.rs
index 876432ce2..7c0928cdd 100644
--- a/ethcore/light/src/net/tests/mod.rs
+++ b/ethcore/light/src/net/tests/mod.rs
@@ -91,16 +91,27 @@ impl Provider for TestProvider {
}
fn block_headers(&self, req: request::Headers) -> Vec {
- let best_num = self.0.client.chain_info().best_block_number;
- let start_num = req.block_num;
+ use request::HashOrNumber;
+ use ethcore::views::HeaderView;
- match self.0.client.block_hash(BlockId::Number(req.block_num)) {
- Some(hash) if hash == req.block_hash => {}
- _=> {
- trace!(target: "les_provider", "unknown/non-canonical start block in header request: {:?}", (req.block_num, req.block_hash));
- return vec![]
+ let best_num = self.chain_info().best_block_number;
+ let start_num = match req.start {
+ HashOrNumber::Number(start_num) => start_num,
+ HashOrNumber::Hash(hash) => match self.0.client.block_header(BlockId::Hash(hash)) {
+ None => {
+ return Vec::new();
+ }
+ Some(header) => {
+ let num = HeaderView::new(&header).number();
+ if req.max == 1 || self.0.client.block_hash(BlockId::Number(num)) != Some(hash) {
+ // Non-canonical header or single header requested.
+ return vec![header];
+ }
+
+ num
+ }
}
- }
+ };
(0u64..req.max as u64)
.map(|x: u64| x.saturating_mul(req.skip + 1))
@@ -250,8 +261,7 @@ fn buffer_overflow() {
// 1000 requests is far too many for the default flow params.
let request = encode_request(&Request::Headers(Headers {
- block_num: 1,
- block_hash: provider.client.chain_info().genesis_hash,
+ start: 1.into(),
max: 1000,
skip: 0,
reverse: false,
@@ -284,8 +294,7 @@ fn get_block_headers() {
}
let request = Headers {
- block_num: 1,
- block_hash: provider.client.block_hash(BlockId::Number(1)).unwrap(),
+ start: 1.into(),
max: 10,
skip: 0,
reverse: false,
@@ -299,9 +308,9 @@ fn get_block_headers() {
let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::Headers, 10);
- let mut response_stream = RlpStream::new_list(12);
+ let mut response_stream = RlpStream::new_list(3);
- response_stream.append(&req_id).append(&new_buf);
+ response_stream.append(&req_id).append(&new_buf).begin_list(10);
for header in headers {
response_stream.append_raw(&header, 1);
}
@@ -346,9 +355,9 @@ fn get_block_bodies() {
let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::Bodies, 10);
- let mut response_stream = RlpStream::new_list(12);
+ let mut response_stream = RlpStream::new_list(3);
- response_stream.append(&req_id).append(&new_buf);
+ response_stream.append(&req_id).append(&new_buf).begin_list(10);
for body in bodies {
response_stream.append_raw(&body, 1);
}
@@ -399,9 +408,9 @@ fn get_block_receipts() {
let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::Receipts, receipts.len());
- let mut response_stream = RlpStream::new_list(2 + receipts.len());
+ let mut response_stream = RlpStream::new_list(3);
- response_stream.append(&req_id).append(&new_buf);
+ response_stream.append(&req_id).append(&new_buf).begin_list(receipts.len());
for block_receipts in receipts {
response_stream.append_raw(&block_receipts, 1);
}
@@ -448,9 +457,9 @@ fn get_state_proofs() {
let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::StateProofs, 2);
- let mut response_stream = RlpStream::new_list(4);
+ let mut response_stream = RlpStream::new_list(3);
- response_stream.append(&req_id).append(&new_buf);
+ response_stream.append(&req_id).append(&new_buf).begin_list(2);
for proof in proofs {
response_stream.append_raw(&proof, 1);
}
@@ -497,9 +506,9 @@ fn get_contract_code() {
let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::Codes, 2);
- let mut response_stream = RlpStream::new_list(4);
+ let mut response_stream = RlpStream::new_list(3);
- response_stream.append(&req_id).append(&new_buf);
+ response_stream.append(&req_id).append(&new_buf).begin_list(2);
for code in codes {
response_stream.append(&code);
}
diff --git a/ethcore/light/src/provider.rs b/ethcore/light/src/provider.rs
index ad8d8ea16..ed2f49f5d 100644
--- a/ethcore/light/src/provider.rs
+++ b/ethcore/light/src/provider.rs
@@ -97,17 +97,29 @@ impl Provider for T {
}
fn block_headers(&self, req: request::Headers) -> Vec {
+ use request::HashOrNumber;
+ use ethcore::views::HeaderView;
+
let best_num = self.chain_info().best_block_number;
- let start_num = req.block_num;
+ let start_num = match req.start {
+ HashOrNumber::Number(start_num) => start_num,
+ HashOrNumber::Hash(hash) => match self.block_header(BlockId::Hash(hash)) {
+ None => {
+ trace!(target: "les_provider", "Unknown block hash {} requested", hash);
+ return Vec::new();
+ }
+ Some(header) => {
+ let num = HeaderView::new(&header).number();
+ if req.max == 1 || self.block_hash(BlockId::Number(num)) != Some(hash) {
+ // Non-canonical header or single header requested.
+ return vec![header];
+ }
- match self.block_hash(BlockId::Number(req.block_num)) {
- Some(hash) if hash == req.block_hash => {}
- _=> {
- trace!(target: "les_provider", "unknown/non-canonical start block in header request: {:?}", (req.block_num, req.block_hash));
- return vec![]
+ num
+ }
}
- }
-
+ };
+
(0u64..req.max as u64)
.map(|x: u64| x.saturating_mul(req.skip + 1))
.take_while(|x| if req.reverse { x < &start_num } else { best_num - start_num >= *x })
diff --git a/ethcore/light/src/types/les_request.rs b/ethcore/light/src/types/les_request.rs
index 49bd2e9cc..2c7bfb380 100644
--- a/ethcore/light/src/types/les_request.rs
+++ b/ethcore/light/src/types/les_request.rs
@@ -18,15 +18,34 @@
use util::H256;
+/// Either a hash or a number.
+#[derive(Debug, Clone, PartialEq, Eq)]
+#[cfg_attr(feature = "ipc", derive(Binary))]
+pub enum HashOrNumber {
+ /// Block hash variant.
+ Hash(H256),
+ /// Block number variant.
+ Number(u64),
+}
+
+impl From for HashOrNumber {
+ fn from(hash: H256) -> Self {
+ HashOrNumber::Hash(hash)
+ }
+}
+
+impl From for HashOrNumber {
+ fn from(num: u64) -> Self {
+ HashOrNumber::Number(num)
+ }
+}
+
/// A request for block headers.
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "ipc", derive(Binary))]
pub struct Headers {
- /// Starting block number
- pub block_num: u64,
- /// Starting block hash. This and number could be combined but IPC codegen is
- /// not robust enough to support it.
- pub block_hash: H256,
+ /// Starting block number or hash.
+ pub start: HashOrNumber,
/// The maximum amount of headers which can be returned.
pub max: usize,
/// The amount of headers to skip between each response entry.