Merge branch 'master' into on-demand-les-request

This commit is contained in:
Robert Habermeier
2017-01-12 11:23:47 +01:00
120 changed files with 2002 additions and 1238 deletions

View File

@@ -23,7 +23,7 @@
//! This module provides an interface for configuration of buffer
//! flow costs and recharge rates.
//!
//! Current default costs are picked completely arbitrarily, not based
//! Current default costs are picked completely arbitrarily, not based
//! on any empirical timings or mathematical models.
use request;
@@ -184,6 +184,23 @@ impl FlowParams {
}
}
/// Create effectively infinite flow params.
pub fn free() -> Self {
let free_cost = Cost(0.into(), 0.into());
FlowParams {
limit: (!0u64).into(),
recharge: 1.into(),
costs: CostTable {
headers: free_cost.clone(),
bodies: free_cost.clone(),
receipts: free_cost.clone(),
state_proofs: free_cost.clone(),
contract_codes: free_cost.clone(),
header_proofs: free_cost.clone(),
}
}
}
/// Get a reference to the buffer limit.
pub fn limit(&self) -> &U256 { &self.limit }
@@ -209,7 +226,7 @@ impl FlowParams {
cost.0 + (amount * cost.1)
}
/// Compute the maximum number of costs of a specific kind which can be made
/// Compute the maximum number of costs of a specific kind which can be made
/// with the given buffer.
/// Saturates at `usize::max()`. This is not a problem in practice because
/// this amount of requests is already prohibitively large.
@@ -317,4 +334,4 @@ mod tests {
assert_eq!(buffer.estimate, 100.into());
}
}
}

View File

@@ -41,7 +41,6 @@ use self::buffer_flow::{Buffer, FlowParams};
use self::context::{Ctx, TickCtx};
use self::error::Punishment;
mod buffer_flow;
mod context;
mod error;
mod status;
@@ -49,6 +48,8 @@ mod status;
#[cfg(test)]
mod tests;
pub mod buffer_flow;
pub use self::error::Error;
pub use self::context::{BasicContext, EventContext, IoContext};
pub use self::status::{Status, Capabilities, Announcement};
@@ -244,7 +245,7 @@ pub struct LightProtocol {
pending_requests: RwLock<HashMap<usize, Requested>>,
capabilities: RwLock<Capabilities>,
flow_params: FlowParams, // assumed static and same for every peer.
handlers: Vec<Box<Handler>>,
handlers: Vec<Arc<Handler>>,
req_id: AtomicUsize,
}
@@ -383,11 +384,11 @@ impl LightProtocol {
}
/// Add an event handler.
/// Ownership will be transferred to the protocol structure,
/// and the handler will be kept alive as long as it is.
///
/// These are intended to be added when the protocol structure
/// is initialized as a means of customizing its behavior.
pub fn add_handler(&mut self, handler: Box<Handler>) {
/// is initialized as a means of customizing its behavior,
/// and dispatching requests immediately upon events.
pub fn add_handler(&mut self, handler: Arc<Handler>) {
self.handlers.push(handler);
}
@@ -447,8 +448,10 @@ impl LightProtocol {
}
}
// handle a packet using the given io context.
fn handle_packet(&self, io: &IoContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
/// Handle an LES packet using the given io context.
/// Packet data is _untrusted_, which means that invalid data won't lead to
/// issues.
pub fn handle_packet(&self, io: &IoContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
let rlp = UntrustedRlp::new(data);
trace!(target: "les", "Incoming packet {} from peer {}", packet_id, peer);
@@ -488,6 +491,71 @@ impl LightProtocol {
}
}
/// called when a peer connects.
pub fn on_connect(&self, peer: &PeerId, io: &IoContext) {
let proto_version = match io.protocol_version(*peer).ok_or(Error::WrongNetwork) {
Ok(pv) => pv,
Err(e) => { punish(*peer, io, e); return }
};
if PROTOCOL_VERSIONS.iter().find(|x| **x == proto_version).is_none() {
punish(*peer, io, Error::UnsupportedProtocolVersion(proto_version));
return;
}
let chain_info = self.provider.chain_info();
let status = Status {
head_td: chain_info.total_difficulty,
head_hash: chain_info.best_block_hash,
head_num: chain_info.best_block_number,
genesis_hash: chain_info.genesis_hash,
protocol_version: proto_version as u32, // match peer proto version
network_id: self.network_id,
last_head: None,
};
let capabilities = self.capabilities.read().clone();
let status_packet = status::write_handshake(&status, &capabilities, Some(&self.flow_params));
self.pending_peers.write().insert(*peer, PendingPeer {
sent_head: chain_info.best_block_hash,
last_update: SteadyTime::now(),
});
io.send(*peer, packet::STATUS, status_packet);
}
/// called when a peer disconnects.
pub fn on_disconnect(&self, peer: PeerId, io: &IoContext) {
trace!(target: "les", "Peer {} disconnecting", peer);
self.pending_peers.write().remove(&peer);
if self.peers.write().remove(&peer).is_some() {
let unfulfilled: Vec<_> = self.pending_requests.read()
.iter()
.filter(|&(_, r)| r.peer_id == peer)
.map(|(&id, _)| ReqId(id))
.collect();
{
let mut pending = self.pending_requests.write();
for &ReqId(ref inner) in &unfulfilled {
pending.remove(inner);
}
}
for handler in &self.handlers {
handler.on_disconnect(&Ctx {
peer: peer,
io: io,
proto: self,
}, &unfulfilled)
}
}
}
// check timeouts and punish peers.
fn timeout_check(&self, io: &IoContext) {
let now = SteadyTime::now();
@@ -536,6 +604,16 @@ impl LightProtocol {
}
}
/// Execute the given closure with a basic context derived from the I/O context.
pub fn with_context<F, T>(&self, io: &IoContext, f: F) -> T
where F: FnOnce(&BasicContext) -> T
{
f(&TickCtx {
io: io,
proto: self,
})
}
fn tick_handlers(&self, io: &IoContext) {
for handler in &self.handlers {
handler.tick(&TickCtx {
@@ -547,71 +625,6 @@ impl LightProtocol {
}
impl LightProtocol {
// called when a peer connects.
fn on_connect(&self, peer: &PeerId, io: &IoContext) {
let proto_version = match io.protocol_version(*peer).ok_or(Error::WrongNetwork) {
Ok(pv) => pv,
Err(e) => { punish(*peer, io, e); return }
};
if PROTOCOL_VERSIONS.iter().find(|x| **x == proto_version).is_none() {
punish(*peer, io, Error::UnsupportedProtocolVersion(proto_version));
return;
}
let chain_info = self.provider.chain_info();
let status = Status {
head_td: chain_info.total_difficulty,
head_hash: chain_info.best_block_hash,
head_num: chain_info.best_block_number,
genesis_hash: chain_info.genesis_hash,
protocol_version: proto_version as u32, // match peer proto version
network_id: self.network_id,
last_head: None,
};
let capabilities = self.capabilities.read().clone();
let status_packet = status::write_handshake(&status, &capabilities, Some(&self.flow_params));
self.pending_peers.write().insert(*peer, PendingPeer {
sent_head: chain_info.best_block_hash,
last_update: SteadyTime::now(),
});
io.send(*peer, packet::STATUS, status_packet);
}
// called when a peer disconnects.
fn on_disconnect(&self, peer: PeerId, io: &IoContext) {
trace!(target: "les", "Peer {} disconnecting", peer);
self.pending_peers.write().remove(&peer);
if self.peers.write().remove(&peer).is_some() {
let unfulfilled: Vec<_> = self.pending_requests.read()
.iter()
.filter(|&(_, r)| r.peer_id == peer)
.map(|(&id, _)| ReqId(id))
.collect();
{
let mut pending = self.pending_requests.write();
for &ReqId(ref inner) in &unfulfilled {
pending.remove(inner);
}
}
for handler in &self.handlers {
handler.on_disconnect(&Ctx {
peer: peer,
io: io,
proto: self,
}, &unfulfilled)
}
}
}
// Handle status message from peer.
fn status(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> {
let pending = match self.pending_peers.write().remove(peer) {

View File

@@ -18,7 +18,7 @@
//! These don't test of the higher level logic on top of
use ethcore::blockchain_info::BlockChainInfo;
use ethcore::client::{BlockChainClient, EachBlockWith, TestBlockChainClient};
use ethcore::client::{EachBlockWith, TestBlockChainClient};
use ethcore::ids::BlockId;
use ethcore::transaction::PendingTransaction;
use ethcore::encoded;
@@ -88,7 +88,7 @@ impl Provider for TestProvider {
}
fn reorg_depth(&self, a: &H256, b: &H256) -> Option<u64> {
self.0.client.tree_route(a, b).map(|route| route.index as u64)
self.0.client.reorg_depth(a, b)
}
fn earliest_state(&self) -> Option<u64> {
@@ -305,7 +305,9 @@ fn get_block_bodies() {
}
let request = request::Bodies {
block_hashes: (0..10).map(|i| provider.client.block_hash(BlockId::Number(i)).unwrap()).collect(),
block_hashes: (0..10).map(|i|
provider.client.block_header(BlockId::Number(i)).unwrap().hash()
).collect()
};
let req_id = 111;
@@ -353,8 +355,9 @@ fn get_block_receipts() {
// find the first 10 block hashes starting with `f` because receipts are only provided
// by the test client in that case.
let block_hashes: Vec<_> = (0..1000).map(|i| provider.client.block_hash(BlockId::Number(i)).unwrap())
.filter(|hash| format!("{}", hash).starts_with("f")).take(10).collect();
let block_hashes: Vec<_> = (0..1000).map(|i|
provider.client.block_header(BlockId::Number(i)).unwrap().hash()
).filter(|hash| format!("{}", hash).starts_with("f")).take(10).collect();
let request = request::Receipts {
block_hashes: block_hashes.clone(),