complete initial request changes
This commit is contained in:
@@ -14,10 +14,9 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! LES Protocol Version 1 implementation.
|
||||
//! PIP Protocol Version 1 implementation.
|
||||
//!
|
||||
//! This uses a "Provider" to answer requests.
|
||||
//! See https://github.com/ethcore/parity/wiki/Light-Ethereum-Subprotocol-(LES)
|
||||
|
||||
use ethcore::transaction::{Action, UnverifiedTransaction};
|
||||
use ethcore::receipt::Receipt;
|
||||
@@ -35,7 +34,7 @@ use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
use provider::Provider;
|
||||
use request::{self, HashOrNumber, Request};
|
||||
use request::{self, HashOrNumber, Request, Response};
|
||||
|
||||
use self::request_credits::{Credits, FlowParams};
|
||||
use self::context::{Ctx, TickCtx};
|
||||
@@ -83,43 +82,24 @@ mod packet {
|
||||
// announcement of new block hashes or capabilities.
|
||||
pub const ANNOUNCE: u8 = 0x01;
|
||||
|
||||
// request and response for block headers
|
||||
pub const GET_BLOCK_HEADERS: u8 = 0x02;
|
||||
pub const BLOCK_HEADERS: u8 = 0x03;
|
||||
|
||||
// request and response for block bodies
|
||||
pub const GET_BLOCK_BODIES: u8 = 0x04;
|
||||
pub const BLOCK_BODIES: u8 = 0x05;
|
||||
|
||||
// request and response for transaction receipts.
|
||||
pub const GET_RECEIPTS: u8 = 0x06;
|
||||
pub const RECEIPTS: u8 = 0x07;
|
||||
|
||||
// request and response for merkle proofs.
|
||||
pub const GET_PROOFS: u8 = 0x08;
|
||||
pub const PROOFS: u8 = 0x09;
|
||||
|
||||
// request and response for contract code.
|
||||
pub const GET_CONTRACT_CODES: u8 = 0x0a;
|
||||
pub const CONTRACT_CODES: u8 = 0x0b;
|
||||
// request and response.
|
||||
pub const REQUEST: u8 = 0x02;
|
||||
pub const RESPONSE: u8 = 0x03;
|
||||
|
||||
// relay transactions to peers.
|
||||
pub const SEND_TRANSACTIONS: u8 = 0x0c;
|
||||
|
||||
// request and response for header proofs in a CHT.
|
||||
pub const GET_HEADER_PROOFS: u8 = 0x0d;
|
||||
pub const HEADER_PROOFS: u8 = 0x0e;
|
||||
pub const SEND_TRANSACTIONS: u8 = 0x04;
|
||||
|
||||
// request and response for transaction proof.
|
||||
pub const GET_TRANSACTION_PROOF: u8 = 0x0f;
|
||||
pub const TRANSACTION_PROOF: u8 = 0x10;
|
||||
// TODO: merge with request/response.
|
||||
pub const GET_TRANSACTION_PROOF: u8 = 0x05;
|
||||
pub const TRANSACTION_PROOF: u8 = 0x06;
|
||||
}
|
||||
|
||||
// timeouts for different kinds of requests. all values are in milliseconds.
|
||||
// TODO: variable timeouts based on request count.
|
||||
mod timeout {
|
||||
pub const HANDSHAKE: i64 = 2500;
|
||||
pub const HEADERS: i64 = 5000;
|
||||
pub const HEADERS: i64 = 2500;
|
||||
pub const BODIES: i64 = 5000;
|
||||
pub const RECEIPTS: i64 = 3500;
|
||||
pub const PROOFS: i64 = 4000;
|
||||
@@ -159,17 +139,6 @@ pub struct Peer {
|
||||
}
|
||||
|
||||
impl Peer {
|
||||
// check the maximum cost of a request, returning an error if there's
|
||||
// not enough credits left.
|
||||
// returns the calculated maximum cost.
|
||||
fn deduct_max(&mut self, flow_params: &FlowParams, kind: request::Kind, max: usize) -> Result<U256, Error> {
|
||||
flow_params.recharge(&mut self.local_credits);
|
||||
|
||||
let max_cost = flow_params.compute_cost(kind, max);
|
||||
self.local_credits.deduct_cost(max_cost)?;
|
||||
Ok(max_cost)
|
||||
}
|
||||
|
||||
// refund credits for a request. returns new amount of credits.
|
||||
fn refund(&mut self, flow_params: &FlowParams, amount: U256) -> U256 {
|
||||
flow_params.refund(&mut self.local_credits, amount);
|
||||
@@ -197,20 +166,8 @@ pub trait Handler: Send + Sync {
|
||||
fn on_announcement(&self, _ctx: &EventContext, _announcement: &Announcement) { }
|
||||
/// Called when a peer requests relay of some transactions.
|
||||
fn on_transactions(&self, _ctx: &EventContext, _relay: &[UnverifiedTransaction]) { }
|
||||
/// Called when a peer responds with block bodies.
|
||||
fn on_block_bodies(&self, _ctx: &EventContext, _req_id: ReqId, _bodies: &[Bytes]) { }
|
||||
/// Called when a peer responds with block headers.
|
||||
fn on_block_headers(&self, _ctx: &EventContext, _req_id: ReqId, _headers: &[Bytes]) { }
|
||||
/// Called when a peer responds with block receipts.
|
||||
fn on_receipts(&self, _ctx: &EventContext, _req_id: ReqId, _receipts: &[Vec<Receipt>]) { }
|
||||
/// Called when a peer responds with state proofs. Each proof should be a series of trie
|
||||
/// nodes in ascending order by distance from the root.
|
||||
fn on_state_proofs(&self, _ctx: &EventContext, _req_id: ReqId, _proofs: &[Vec<Bytes>]) { }
|
||||
/// Called when a peer responds with contract code.
|
||||
fn on_code(&self, _ctx: &EventContext, _req_id: ReqId, _codes: &[Bytes]) { }
|
||||
/// Called when a peer responds with header proofs. Each proof should be a block header coupled
|
||||
/// with a series of trie nodes is ascending order by distance from the root.
|
||||
fn on_header_proofs(&self, _ctx: &EventContext, _req_id: ReqId, _proofs: &[(Bytes, Vec<Bytes>)]) { }
|
||||
/// Called when a peer responds to requests.
|
||||
fn on_responses(&self, _ctx: &EventContext, _req_id: ReqId, _relay: &[Response]) { }
|
||||
/// Called when a peer responds with a transaction proof. Each proof is a vector of state items.
|
||||
fn on_transaction_proof(&self, _ctx: &EventContext, _req_id: ReqId, _state_items: &[DBValue]) { }
|
||||
/// Called to "tick" the handler periodically.
|
||||
@@ -307,7 +264,7 @@ pub struct LightProtocol {
|
||||
impl LightProtocol {
|
||||
/// Create a new instance of the protocol manager.
|
||||
pub fn new(provider: Arc<Provider>, params: Params) -> Self {
|
||||
debug!(target: "les", "Initializing LES handler");
|
||||
debug!(target: "pip", "Initializing light protocol handler");
|
||||
|
||||
let genesis_hash = provider.chain_info().genesis_hash;
|
||||
LightProtocol {
|
||||
@@ -339,62 +296,15 @@ impl LightProtocol {
|
||||
)
|
||||
}
|
||||
|
||||
/// Check the maximum amount of requests of a specific type
|
||||
/// which a peer would be able to serve. Returns zero if the
|
||||
/// peer is unknown or has no credit parameters.
|
||||
fn max_requests(&self, peer: PeerId, kind: request::Kind) -> usize {
|
||||
self.peers.read().get(&peer).and_then(|peer| {
|
||||
let mut peer = peer.lock();
|
||||
match peer.remote_flow {
|
||||
Some((ref mut c, ref flow)) => {
|
||||
flow.recharge(c);
|
||||
Some(flow.max_amount(&*c, kind))
|
||||
}
|
||||
None => None,
|
||||
}
|
||||
}).unwrap_or(0)
|
||||
}
|
||||
|
||||
/// Make a request to a peer.
|
||||
///
|
||||
/// Fails on: nonexistent peer, network error, peer not server,
|
||||
/// insufficient credits. Does not check capabilities before sending.
|
||||
/// On success, returns a request id which can later be coordinated
|
||||
/// with an event.
|
||||
// TODO: pass `Requests`.
|
||||
pub fn request_from(&self, io: &IoContext, peer_id: &PeerId, request: Request) -> Result<ReqId, Error> {
|
||||
let peers = self.peers.read();
|
||||
let peer = peers.get(peer_id).ok_or_else(|| Error::UnknownPeer)?;
|
||||
let mut peer = peer.lock();
|
||||
|
||||
match peer.remote_flow {
|
||||
Some((ref mut c, ref flow)) => {
|
||||
flow.recharge(c);
|
||||
let max = flow.compute_cost(request.kind(), request.amount());
|
||||
c.deduct_cost(max)?;
|
||||
}
|
||||
None => return Err(Error::NotServer),
|
||||
}
|
||||
|
||||
let req_id = self.req_id.fetch_add(1, Ordering::SeqCst);
|
||||
let packet_data = encode_request(&request, req_id);
|
||||
|
||||
trace!(target: "les", "Dispatching request {} to peer {}", req_id, peer_id);
|
||||
|
||||
let packet_id = match request.kind() {
|
||||
request::Kind::Headers => packet::GET_BLOCK_HEADERS,
|
||||
request::Kind::Bodies => packet::GET_BLOCK_BODIES,
|
||||
request::Kind::Receipts => packet::GET_RECEIPTS,
|
||||
request::Kind::StateProofs => packet::GET_PROOFS,
|
||||
request::Kind::Codes => packet::GET_CONTRACT_CODES,
|
||||
request::Kind::HeaderProofs => packet::GET_HEADER_PROOFS,
|
||||
request::Kind::TransactionProof => packet::GET_TRANSACTION_PROOF,
|
||||
};
|
||||
|
||||
io.send(*peer_id, packet_id, packet_data);
|
||||
|
||||
peer.pending_requests.insert(ReqId(req_id), request, SteadyTime::now());
|
||||
|
||||
Ok(ReqId(req_id))
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
/// Make an announcement of new chain head and capabilities to all peers.
|
||||
@@ -427,7 +337,7 @@ impl LightProtocol {
|
||||
None => {
|
||||
// both values will always originate locally -- this means something
|
||||
// has gone really wrong
|
||||
debug!(target: "les", "couldn't compute reorganization depth between {:?} and {:?}",
|
||||
debug!(target: "pip", "couldn't compute reorganization depth between {:?} and {:?}",
|
||||
&announcement.head_hash, &peer_info.sent_head);
|
||||
0
|
||||
}
|
||||
@@ -474,11 +384,10 @@ impl LightProtocol {
|
||||
let req_id = ReqId(raw.val_at(0)?);
|
||||
let cur_credits: U256 = raw.val_at(1)?;
|
||||
|
||||
trace!(target: "les", "pre-verifying response from peer {}, kind={:?}", peer, kind);
|
||||
trace!(target: "pip", "pre-verifying response from peer {}, kind={:?}", peer, kind);
|
||||
|
||||
let mut had_req = false;
|
||||
let peers = self.peers.read();
|
||||
let maybe_err = match peers.get(peer) {
|
||||
let res = match peers.get(peer) {
|
||||
Some(peer_info) => {
|
||||
let mut peer_info = peer_info.lock();
|
||||
let req_info = peer_info.pending_requests.remove(&req_id, SteadyTime::now());
|
||||
@@ -486,69 +395,37 @@ impl LightProtocol {
|
||||
|
||||
match (req_info, flow_info) {
|
||||
(Some(request), Some(flow_info)) => {
|
||||
had_req = true;
|
||||
|
||||
let &mut (ref mut c, ref mut flow) = flow_info;
|
||||
let actual_credits = ::std::cmp::min(cur_credits, *flow.limit());
|
||||
c.update_to(actual_credits);
|
||||
|
||||
if request.kind() != kind {
|
||||
Some(Error::UnsolicitedResponse)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
(None, _) => Some(Error::UnsolicitedResponse),
|
||||
(_, None) => Some(Error::NotServer), // really should be impossible.
|
||||
(None, _) => Err(Error::UnsolicitedResponse),
|
||||
(_, None) => Err(Error::NotServer), // really should be impossible.
|
||||
}
|
||||
}
|
||||
None => Some(Error::UnknownPeer), // probably only occurs in a race of some kind.
|
||||
None => Err(Error::UnknownPeer), // probably only occurs in a race of some kind.
|
||||
};
|
||||
|
||||
if had_req {
|
||||
let id_guard = IdGuard::new(peers, *peer, req_id);
|
||||
match maybe_err {
|
||||
Some(err) => Err(err),
|
||||
None => Ok(id_guard)
|
||||
}
|
||||
} else {
|
||||
Err(maybe_err.expect("every branch without a request leads to error; qed"))
|
||||
}
|
||||
res.map(|_| IdGuard::new(peers, *peer, req_id))
|
||||
}
|
||||
|
||||
/// Handle an LES packet using the given io context.
|
||||
/// Handle a packet using the given io context.
|
||||
/// Packet data is _untrusted_, which means that invalid data won't lead to
|
||||
/// issues.
|
||||
pub fn handle_packet(&self, io: &IoContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
|
||||
let rlp = UntrustedRlp::new(data);
|
||||
|
||||
trace!(target: "les", "Incoming packet {} from peer {}", packet_id, peer);
|
||||
trace!(target: "pip", "Incoming packet {} from peer {}", packet_id, peer);
|
||||
|
||||
// handle the packet
|
||||
let res = match packet_id {
|
||||
packet::STATUS => self.status(peer, io, rlp),
|
||||
packet::ANNOUNCE => self.announcement(peer, io, rlp),
|
||||
|
||||
packet::GET_BLOCK_HEADERS => self.get_block_headers(peer, io, rlp),
|
||||
packet::BLOCK_HEADERS => self.block_headers(peer, io, rlp),
|
||||
|
||||
packet::GET_BLOCK_BODIES => self.get_block_bodies(peer, io, rlp),
|
||||
packet::BLOCK_BODIES => self.block_bodies(peer, io, rlp),
|
||||
|
||||
packet::GET_RECEIPTS => self.get_receipts(peer, io, rlp),
|
||||
packet::RECEIPTS => self.receipts(peer, io, rlp),
|
||||
|
||||
packet::GET_PROOFS => self.get_proofs(peer, io, rlp),
|
||||
packet::PROOFS => self.proofs(peer, io, rlp),
|
||||
|
||||
packet::GET_CONTRACT_CODES => self.get_contract_code(peer, io, rlp),
|
||||
packet::CONTRACT_CODES => self.contract_code(peer, io, rlp),
|
||||
|
||||
packet::GET_HEADER_PROOFS => self.get_header_proofs(peer, io, rlp),
|
||||
packet::HEADER_PROOFS => self.header_proofs(peer, io, rlp),
|
||||
|
||||
packet::GET_TRANSACTION_PROOF => self.get_transaction_proof(peer, io, rlp),
|
||||
packet::TRANSACTION_PROOF => self.transaction_proof(peer, io, rlp),
|
||||
packet::REQUEST => self.request(peer, io, rlp),
|
||||
packet::RESPONSE => self.response(peer, io, rlp),
|
||||
|
||||
packet::SEND_TRANSACTIONS => self.relay_transactions(peer, io, rlp),
|
||||
|
||||
@@ -577,7 +454,7 @@ impl LightProtocol {
|
||||
.collect();
|
||||
|
||||
for slowpoke in slowpokes {
|
||||
debug!(target: "les", "Peer {} handshake timed out", slowpoke);
|
||||
debug!(target: "pip", "Peer {} handshake timed out", slowpoke);
|
||||
pending.remove(&slowpoke);
|
||||
io.disconnect_peer(slowpoke);
|
||||
}
|
||||
@@ -587,7 +464,7 @@ impl LightProtocol {
|
||||
{
|
||||
for (peer_id, peer) in self.peers.read().iter() {
|
||||
if peer.lock().pending_requests.check_timeout(now) {
|
||||
debug!(target: "les", "Peer {} request timeout", peer_id);
|
||||
debug!(target: "pip", "Peer {} request timeout", peer_id);
|
||||
io.disconnect_peer(*peer_id);
|
||||
}
|
||||
}
|
||||
@@ -631,7 +508,7 @@ impl LightProtocol {
|
||||
|
||||
/// called when a peer disconnects.
|
||||
pub fn on_disconnect(&self, peer: PeerId, io: &IoContext) {
|
||||
trace!(target: "les", "Peer {} disconnecting", peer);
|
||||
trace!(target: "pip", "Peer {} disconnecting", peer);
|
||||
|
||||
self.pending_peers.write().remove(&peer);
|
||||
let unfulfilled = match self.peers.write().remove(&peer) {
|
||||
@@ -686,7 +563,7 @@ impl LightProtocol {
|
||||
|
||||
let (status, capabilities, flow_params) = status::parse_handshake(data)?;
|
||||
|
||||
trace!(target: "les", "Connected peer with chain head {:?}", (status.head_hash, status.head_num));
|
||||
trace!(target: "pip", "Connected peer with chain head {:?}", (status.head_hash, status.head_num));
|
||||
|
||||
if (status.network_id, status.genesis_hash) != (self.network_id, self.genesis_hash) {
|
||||
return Err(Error::WrongNetwork);
|
||||
@@ -723,7 +600,7 @@ impl LightProtocol {
|
||||
// Handle an announcement.
|
||||
fn announcement(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> {
|
||||
if !self.peers.read().contains_key(peer) {
|
||||
debug!(target: "les", "Ignoring announcement from unknown peer");
|
||||
debug!(target: "pip", "Ignoring announcement from unknown peer");
|
||||
return Ok(())
|
||||
}
|
||||
|
||||
@@ -765,447 +642,19 @@ impl LightProtocol {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Handle a request for block headers.
|
||||
fn get_block_headers(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> {
|
||||
const MAX_HEADERS: usize = 512;
|
||||
|
||||
let peers = self.peers.read();
|
||||
let peer = match peers.get(peer) {
|
||||
Some(peer) => peer,
|
||||
None => {
|
||||
debug!(target: "les", "Ignoring request from unknown peer");
|
||||
return Ok(())
|
||||
}
|
||||
};
|
||||
|
||||
let mut peer = peer.lock();
|
||||
|
||||
let req_id: u64 = data.val_at(0)?;
|
||||
let data = data.at(1)?;
|
||||
|
||||
let start_block = {
|
||||
if data.at(0)?.size() == 32 {
|
||||
HashOrNumber::Hash(data.val_at(0)?)
|
||||
} else {
|
||||
HashOrNumber::Number(data.val_at(0)?)
|
||||
}
|
||||
};
|
||||
|
||||
let req = request::Headers {
|
||||
start: start_block,
|
||||
max: ::std::cmp::min(MAX_HEADERS, data.val_at(1)?),
|
||||
skip: data.val_at(2)?,
|
||||
reverse: data.val_at(3)?,
|
||||
};
|
||||
|
||||
let max_cost = peer.deduct_max(&self.flow_params, request::Kind::Headers, req.max)?;
|
||||
|
||||
let response = self.provider.block_headers(req);
|
||||
let actual_cost = self.flow_params.compute_cost(request::Kind::Headers, response.len());
|
||||
assert!(max_cost >= actual_cost, "Actual cost exceeded maximum computed cost.");
|
||||
|
||||
let cur_credits = peer.refund(&self.flow_params, max_cost - actual_cost);
|
||||
io.respond(packet::BLOCK_HEADERS, {
|
||||
let mut stream = RlpStream::new_list(3);
|
||||
stream.append(&req_id).append(&cur_credits).begin_list(response.len());
|
||||
|
||||
for header in response {
|
||||
stream.append_raw(&header.into_inner(), 1);
|
||||
}
|
||||
|
||||
stream.out()
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Receive a response for block headers.
|
||||
fn block_headers(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
|
||||
let id_guard = self.pre_verify_response(peer, request::Kind::Headers, &raw)?;
|
||||
let raw_headers: Vec<_> = raw.at(2)?.iter().map(|x| x.as_raw().to_owned()).collect();
|
||||
|
||||
let req_id = id_guard.defuse();
|
||||
for handler in &self.handlers {
|
||||
handler.on_block_headers(&Ctx {
|
||||
peer: *peer,
|
||||
io: io,
|
||||
proto: self,
|
||||
}, req_id, &raw_headers);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Handle a request for block bodies.
|
||||
fn get_block_bodies(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> {
|
||||
const MAX_BODIES: usize = 256;
|
||||
|
||||
let peers = self.peers.read();
|
||||
let peer = match peers.get(peer) {
|
||||
Some(peer) => peer,
|
||||
None => {
|
||||
debug!(target: "les", "Ignoring request from unknown peer");
|
||||
return Ok(())
|
||||
}
|
||||
};
|
||||
let mut peer = peer.lock();
|
||||
|
||||
let req_id: u64 = data.val_at(0)?;
|
||||
|
||||
let req = request::Bodies {
|
||||
block_hashes: data.at(1)?.iter()
|
||||
.take(MAX_BODIES)
|
||||
.map(|x| x.as_val())
|
||||
.collect::<Result<_, _>>()?
|
||||
};
|
||||
|
||||
let max_cost = peer.deduct_max(&self.flow_params, request::Kind::Bodies, req.block_hashes.len())?;
|
||||
|
||||
let response = self.provider.block_bodies(req);
|
||||
let response_len = response.iter().filter(|x| x.is_some()).count();
|
||||
let actual_cost = self.flow_params.compute_cost(request::Kind::Bodies, response_len);
|
||||
assert!(max_cost >= actual_cost, "Actual cost exceeded maximum computed cost.");
|
||||
|
||||
let cur_credits = peer.refund(&self.flow_params, max_cost - actual_cost);
|
||||
|
||||
io.respond(packet::BLOCK_BODIES, {
|
||||
let mut stream = RlpStream::new_list(3);
|
||||
stream.append(&req_id).append(&cur_credits).begin_list(response.len());
|
||||
|
||||
for body in response {
|
||||
match body {
|
||||
Some(body) => stream.append_raw(&body.into_inner(), 1),
|
||||
None => stream.append_empty_data(),
|
||||
};
|
||||
}
|
||||
|
||||
stream.out()
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Receive a response for block bodies.
|
||||
fn block_bodies(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
|
||||
let id_guard = self.pre_verify_response(peer, request::Kind::Bodies, &raw)?;
|
||||
let raw_bodies: Vec<Bytes> = raw.at(2)?.iter().map(|x| x.as_raw().to_owned()).collect();
|
||||
|
||||
let req_id = id_guard.defuse();
|
||||
for handler in &self.handlers {
|
||||
handler.on_block_bodies(&Ctx {
|
||||
peer: *peer,
|
||||
io: io,
|
||||
proto: self,
|
||||
}, req_id, &raw_bodies);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Handle a request for receipts.
|
||||
fn get_receipts(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> {
|
||||
const MAX_RECEIPTS: usize = 256;
|
||||
|
||||
let peers = self.peers.read();
|
||||
let peer = match peers.get(peer) {
|
||||
Some(peer) => peer,
|
||||
None => {
|
||||
debug!(target: "les", "Ignoring request from unknown peer");
|
||||
return Ok(())
|
||||
}
|
||||
};
|
||||
let mut peer = peer.lock();
|
||||
|
||||
let req_id: u64 = data.val_at(0)?;
|
||||
|
||||
let req = request::Receipts {
|
||||
block_hashes: data.at(1)?.iter()
|
||||
.take(MAX_RECEIPTS)
|
||||
.map(|x| x.as_val())
|
||||
.collect::<Result<_,_>>()?
|
||||
};
|
||||
|
||||
let max_cost = peer.deduct_max(&self.flow_params, request::Kind::Receipts, req.block_hashes.len())?;
|
||||
|
||||
let response = self.provider.receipts(req);
|
||||
let response_len = response.iter().filter(|x| &x[..] != &::rlp::EMPTY_LIST_RLP).count();
|
||||
let actual_cost = self.flow_params.compute_cost(request::Kind::Receipts, response_len);
|
||||
assert!(max_cost >= actual_cost, "Actual cost exceeded maximum computed cost.");
|
||||
|
||||
let cur_credits = peer.refund(&self.flow_params, max_cost - actual_cost);
|
||||
|
||||
io.respond(packet::RECEIPTS, {
|
||||
let mut stream = RlpStream::new_list(3);
|
||||
stream.append(&req_id).append(&cur_credits).begin_list(response.len());
|
||||
|
||||
for receipts in response {
|
||||
stream.append_raw(&receipts, 1);
|
||||
}
|
||||
|
||||
stream.out()
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Receive a response for receipts.
|
||||
fn receipts(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
|
||||
let id_guard = self.pre_verify_response(peer, request::Kind::Receipts, &raw)?;
|
||||
let raw_receipts: Vec<Vec<Receipt>> = raw.at(2)?
|
||||
.iter()
|
||||
.map(|x| x.as_val())
|
||||
.collect::<Result<_,_>>()?;
|
||||
|
||||
let req_id = id_guard.defuse();
|
||||
for handler in &self.handlers {
|
||||
handler.on_receipts(&Ctx {
|
||||
peer: *peer,
|
||||
io: io,
|
||||
proto: self,
|
||||
}, req_id, &raw_receipts);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Handle a request for proofs.
|
||||
fn get_proofs(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> {
|
||||
const MAX_PROOFS: usize = 128;
|
||||
|
||||
let peers = self.peers.read();
|
||||
let peer = match peers.get(peer) {
|
||||
Some(peer) => peer,
|
||||
None => {
|
||||
debug!(target: "les", "Ignoring request from unknown peer");
|
||||
return Ok(())
|
||||
}
|
||||
};
|
||||
let mut peer = peer.lock();
|
||||
|
||||
let req_id: u64 = data.val_at(0)?;
|
||||
|
||||
let req = {
|
||||
let requests: Result<Vec<_>, Error> = data.at(1)?.iter().take(MAX_PROOFS).map(|x| {
|
||||
Ok(request::StateProof {
|
||||
block: x.val_at(0)?,
|
||||
key1: x.val_at(1)?,
|
||||
key2: if x.at(2)?.is_empty() { None } else { Some(x.val_at(2)?) },
|
||||
from_level: x.val_at(3)?,
|
||||
})
|
||||
}).collect();
|
||||
|
||||
request::StateProofs {
|
||||
requests: requests?,
|
||||
}
|
||||
};
|
||||
|
||||
let max_cost = peer.deduct_max(&self.flow_params, request::Kind::StateProofs, req.requests.len())?;
|
||||
|
||||
let response = self.provider.proofs(req);
|
||||
let response_len = response.iter().filter(|x| &x[..] != &::rlp::EMPTY_LIST_RLP).count();
|
||||
let actual_cost = self.flow_params.compute_cost(request::Kind::StateProofs, response_len);
|
||||
assert!(max_cost >= actual_cost, "Actual cost exceeded maximum computed cost.");
|
||||
|
||||
let cur_credits = peer.refund(&self.flow_params, max_cost - actual_cost);
|
||||
|
||||
io.respond(packet::PROOFS, {
|
||||
let mut stream = RlpStream::new_list(3);
|
||||
stream.append(&req_id).append(&cur_credits).begin_list(response.len());
|
||||
|
||||
for proof in response {
|
||||
stream.append_raw(&proof, 1);
|
||||
}
|
||||
|
||||
stream.out()
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Receive a response for proofs.
|
||||
fn proofs(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
|
||||
let id_guard = self.pre_verify_response(peer, request::Kind::StateProofs, &raw)?;
|
||||
|
||||
let raw_proofs: Vec<Vec<Bytes>> = raw.at(2)?.iter()
|
||||
.map(|x| x.iter().map(|node| node.as_raw().to_owned()).collect())
|
||||
.collect();
|
||||
|
||||
let req_id = id_guard.defuse();
|
||||
for handler in &self.handlers {
|
||||
handler.on_state_proofs(&Ctx {
|
||||
peer: *peer,
|
||||
io: io,
|
||||
proto: self,
|
||||
}, req_id, &raw_proofs);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Handle a request for contract code.
|
||||
fn get_contract_code(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> {
|
||||
const MAX_CODES: usize = 256;
|
||||
|
||||
let peers = self.peers.read();
|
||||
let peer = match peers.get(peer) {
|
||||
Some(peer) => peer,
|
||||
None => {
|
||||
debug!(target: "les", "Ignoring request from unknown peer");
|
||||
return Ok(())
|
||||
}
|
||||
};
|
||||
let mut peer = peer.lock();
|
||||
|
||||
let req_id: u64 = data.val_at(0)?;
|
||||
|
||||
let req = {
|
||||
let requests: Result<Vec<_>, Error> = data.at(1)?.iter().take(MAX_CODES).map(|x| {
|
||||
Ok(request::ContractCode {
|
||||
block_hash: x.val_at(0)?,
|
||||
account_key: x.val_at(1)?,
|
||||
})
|
||||
}).collect();
|
||||
|
||||
request::ContractCodes {
|
||||
code_requests: requests?,
|
||||
}
|
||||
};
|
||||
|
||||
let max_cost = peer.deduct_max(&self.flow_params, request::Kind::Codes, req.code_requests.len())?;
|
||||
|
||||
let response = self.provider.contract_codes(req);
|
||||
let response_len = response.iter().filter(|x| !x.is_empty()).count();
|
||||
let actual_cost = self.flow_params.compute_cost(request::Kind::Codes, response_len);
|
||||
assert!(max_cost >= actual_cost, "Actual cost exceeded maximum computed cost.");
|
||||
|
||||
let cur_credits = peer.refund(&self.flow_params, max_cost - actual_cost);
|
||||
|
||||
io.respond(packet::CONTRACT_CODES, {
|
||||
let mut stream = RlpStream::new_list(3);
|
||||
stream.append(&req_id).append(&cur_credits).begin_list(response.len());
|
||||
|
||||
for code in response {
|
||||
stream.append(&code);
|
||||
}
|
||||
|
||||
stream.out()
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Receive a response for contract code.
|
||||
fn contract_code(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
|
||||
let id_guard = self.pre_verify_response(peer, request::Kind::Codes, &raw)?;
|
||||
|
||||
let raw_code: Vec<Bytes> = raw.at(2)?.iter()
|
||||
.map(|x| x.as_val())
|
||||
.collect::<Result<_,_>>()?;
|
||||
|
||||
let req_id = id_guard.defuse();
|
||||
for handler in &self.handlers {
|
||||
handler.on_code(&Ctx {
|
||||
peer: *peer,
|
||||
io: io,
|
||||
proto: self,
|
||||
}, req_id, &raw_code);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Handle a request for header proofs
|
||||
fn get_header_proofs(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> {
|
||||
const MAX_PROOFS: usize = 256;
|
||||
|
||||
let peers = self.peers.read();
|
||||
let peer = match peers.get(peer) {
|
||||
Some(peer) => peer,
|
||||
None => {
|
||||
debug!(target: "les", "Ignoring request from unknown peer");
|
||||
return Ok(())
|
||||
}
|
||||
};
|
||||
let mut peer = peer.lock();
|
||||
|
||||
let req_id: u64 = data.val_at(0)?;
|
||||
|
||||
let req = {
|
||||
let requests: Result<Vec<_>, Error> = data.at(1)?.iter().take(MAX_PROOFS).map(|x| {
|
||||
Ok(request::HeaderProof {
|
||||
cht_number: x.val_at(0)?,
|
||||
block_number: x.val_at(1)?,
|
||||
from_level: x.val_at(2)?,
|
||||
})
|
||||
}).collect();
|
||||
|
||||
request::HeaderProofs {
|
||||
requests: requests?,
|
||||
}
|
||||
};
|
||||
|
||||
let max_cost = peer.deduct_max(&self.flow_params, request::Kind::HeaderProofs, req.requests.len())?;
|
||||
|
||||
let response = self.provider.header_proofs(req);
|
||||
let response_len = response.iter().filter(|x| &x[..] != ::rlp::EMPTY_LIST_RLP).count();
|
||||
let actual_cost = self.flow_params.compute_cost(request::Kind::HeaderProofs, response_len);
|
||||
assert!(max_cost >= actual_cost, "Actual cost exceeded maximum computed cost.");
|
||||
|
||||
let cur_credits = peer.refund(&self.flow_params, max_cost - actual_cost);
|
||||
|
||||
io.respond(packet::HEADER_PROOFS, {
|
||||
let mut stream = RlpStream::new_list(3);
|
||||
stream.append(&req_id).append(&cur_credits).begin_list(response.len());
|
||||
|
||||
for proof in response {
|
||||
stream.append_raw(&proof, 1);
|
||||
}
|
||||
|
||||
stream.out()
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Receive a response for header proofs
|
||||
fn header_proofs(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
|
||||
fn decode_res(raw: UntrustedRlp) -> Result<(Bytes, Vec<Bytes>), ::rlp::DecoderError> {
|
||||
Ok((
|
||||
raw.val_at(0)?,
|
||||
raw.at(1)?.iter().map(|x| x.as_raw().to_owned()).collect(),
|
||||
))
|
||||
}
|
||||
|
||||
let id_guard = self.pre_verify_response(peer, request::Kind::HeaderProofs, &raw)?;
|
||||
let raw_proofs: Vec<_> = raw.at(2)?.iter()
|
||||
.map(decode_res)
|
||||
.collect::<Result<_,_>>()?;
|
||||
|
||||
let req_id = id_guard.defuse();
|
||||
for handler in &self.handlers {
|
||||
handler.on_header_proofs(&Ctx {
|
||||
peer: *peer,
|
||||
io: io,
|
||||
proto: self,
|
||||
}, req_id, &raw_proofs);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Receive a request for proof-of-execution.
|
||||
fn get_transaction_proof(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
|
||||
// refuse to execute more than this amount of gas at once.
|
||||
// this is appx. the point at which the proof of execution would no longer fit in
|
||||
// a single Devp2p packet.
|
||||
fn request(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
|
||||
// the maximum amount of requests we'll fill in a single packet.
|
||||
const MAX_REQUESTS: usize = 512;
|
||||
// the maximum amount of gas we'll prove execution of in a single packet.
|
||||
const MAX_GAS: usize = 50_000_000;
|
||||
use util::Uint;
|
||||
|
||||
use ::request_builder::RequestBuilder;
|
||||
|
||||
let peers = self.peers.read();
|
||||
let peer = match peers.get(peer) {
|
||||
Some(peer) => peer,
|
||||
None => {
|
||||
debug!(target: "les", "Ignoring request from unknown peer");
|
||||
debug!(target: "pip", "Ignoring request from unknown peer");
|
||||
return Ok(())
|
||||
}
|
||||
};
|
||||
@@ -1213,68 +662,11 @@ impl LightProtocol {
|
||||
|
||||
let req_id: u64 = raw.val_at(0)?;
|
||||
|
||||
let req = {
|
||||
let req_rlp = raw.at(1)?;
|
||||
request::TransactionProof {
|
||||
at: req_rlp.val_at(0)?,
|
||||
from: req_rlp.val_at(1)?,
|
||||
action: if req_rlp.at(2)?.is_empty() {
|
||||
Action::Create
|
||||
} else {
|
||||
Action::Call(req_rlp.val_at(2)?)
|
||||
},
|
||||
gas: ::std::cmp::min(req_rlp.val_at(3)?, MAX_GAS.into()),
|
||||
gas_price: req_rlp.val_at(4)?,
|
||||
value: req_rlp.val_at(5)?,
|
||||
data: req_rlp.val_at(6)?,
|
||||
}
|
||||
};
|
||||
|
||||
// always charge the peer for all the gas.
|
||||
peer.deduct_max(&self.flow_params, request::Kind::TransactionProof, req.gas.low_u64() as usize)?;
|
||||
|
||||
let response = match self.provider.transaction_proof(req) {
|
||||
Some(res) => res,
|
||||
None => vec![],
|
||||
};
|
||||
|
||||
let cur_credits = peer.local_credits.current();
|
||||
|
||||
io.respond(packet::TRANSACTION_PROOF, {
|
||||
let mut stream = RlpStream::new_list(3);
|
||||
stream.append(&req_id).append(&cur_credits).begin_list(response.len());
|
||||
|
||||
for state_item in response {
|
||||
stream.append(&&state_item[..]);
|
||||
}
|
||||
|
||||
stream.out()
|
||||
});
|
||||
|
||||
Ok(())
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
// Receive a response for proof-of-execution.
|
||||
fn transaction_proof(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
|
||||
let id_guard = self.pre_verify_response(peer, request::Kind::HeaderProofs, &raw)?;
|
||||
let raw_proof: Vec<DBValue> = raw.at(2)?.iter()
|
||||
.map(|rlp| {
|
||||
let mut db_val = DBValue::new();
|
||||
db_val.append_slice(rlp.data()?);
|
||||
Ok(db_val)
|
||||
})
|
||||
.collect::<Result<Vec<_>, ::rlp::DecoderError>>()?;
|
||||
|
||||
let req_id = id_guard.defuse();
|
||||
for handler in &self.handlers {
|
||||
handler.on_transaction_proof(&Ctx {
|
||||
peer: *peer,
|
||||
io: io,
|
||||
proto: self,
|
||||
}, req_id, &raw_proof);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
fn response(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
// Receive a set of transactions to relay.
|
||||
@@ -1286,7 +678,7 @@ impl LightProtocol {
|
||||
.map(|x| x.as_val::<UnverifiedTransaction>())
|
||||
.collect::<Result<_,_>>()?;
|
||||
|
||||
debug!(target: "les", "Received {} transactions to relay from peer {}", txs.len(), peer);
|
||||
debug!(target: "pip", "Received {} transactions to relay from peer {}", txs.len(), peer);
|
||||
|
||||
for handler in &self.handlers {
|
||||
handler.on_transactions(&Ctx {
|
||||
@@ -1305,11 +697,11 @@ fn punish(peer: PeerId, io: &IoContext, e: Error) {
|
||||
match e.punishment() {
|
||||
Punishment::None => {}
|
||||
Punishment::Disconnect => {
|
||||
debug!(target: "les", "Disconnecting peer {}: {}", peer, e);
|
||||
debug!(target: "pip", "Disconnecting peer {}: {}", peer, e);
|
||||
io.disconnect_peer(peer)
|
||||
}
|
||||
Punishment::Disable => {
|
||||
debug!(target: "les", "Disabling peer {}: {}", peer, e);
|
||||
debug!(target: "pip", "Disabling peer {}: {}", peer, e);
|
||||
io.disable_peer(peer)
|
||||
}
|
||||
}
|
||||
@@ -1339,112 +731,7 @@ impl NetworkProtocolHandler for LightProtocol {
|
||||
match timer {
|
||||
TIMEOUT => self.timeout_check(io),
|
||||
TICK_TIMEOUT => self.tick_handlers(io),
|
||||
_ => warn!(target: "les", "received timeout on unknown token {}", timer),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Helper for encoding the request to RLP with the given ID.
|
||||
fn encode_request(req: &Request, req_id: usize) -> Vec<u8> {
|
||||
match *req {
|
||||
Request::Headers(ref headers) => {
|
||||
let mut stream = RlpStream::new_list(2);
|
||||
stream.append(&req_id).begin_list(4);
|
||||
|
||||
match headers.start {
|
||||
HashOrNumber::Hash(ref hash) => stream.append(hash),
|
||||
HashOrNumber::Number(ref num) => stream.append(num),
|
||||
};
|
||||
|
||||
stream
|
||||
.append(&headers.max)
|
||||
.append(&headers.skip)
|
||||
.append(&headers.reverse);
|
||||
|
||||
stream.out()
|
||||
}
|
||||
Request::Bodies(ref request) => {
|
||||
let mut stream = RlpStream::new_list(2);
|
||||
stream.append(&req_id).begin_list(request.block_hashes.len());
|
||||
|
||||
for hash in &request.block_hashes {
|
||||
stream.append(hash);
|
||||
}
|
||||
|
||||
stream.out()
|
||||
}
|
||||
Request::Receipts(ref request) => {
|
||||
let mut stream = RlpStream::new_list(2);
|
||||
stream.append(&req_id).begin_list(request.block_hashes.len());
|
||||
|
||||
for hash in &request.block_hashes {
|
||||
stream.append(hash);
|
||||
}
|
||||
|
||||
stream.out()
|
||||
}
|
||||
Request::StateProofs(ref request) => {
|
||||
let mut stream = RlpStream::new_list(2);
|
||||
stream.append(&req_id).begin_list(request.requests.len());
|
||||
|
||||
for proof_req in &request.requests {
|
||||
stream.begin_list(4)
|
||||
.append(&proof_req.block)
|
||||
.append(&proof_req.key1);
|
||||
|
||||
match proof_req.key2 {
|
||||
Some(ref key2) => stream.append(key2),
|
||||
None => stream.append_empty_data(),
|
||||
};
|
||||
|
||||
stream.append(&proof_req.from_level);
|
||||
}
|
||||
|
||||
stream.out()
|
||||
}
|
||||
Request::Codes(ref request) => {
|
||||
let mut stream = RlpStream::new_list(2);
|
||||
stream.append(&req_id).begin_list(request.code_requests.len());
|
||||
|
||||
for code_req in &request.code_requests {
|
||||
stream.begin_list(2)
|
||||
.append(&code_req.block_hash)
|
||||
.append(&code_req.account_key);
|
||||
}
|
||||
|
||||
stream.out()
|
||||
}
|
||||
Request::HeaderProofs(ref request) => {
|
||||
let mut stream = RlpStream::new_list(2);
|
||||
stream.append(&req_id).begin_list(request.requests.len());
|
||||
|
||||
for proof_req in &request.requests {
|
||||
stream.begin_list(3)
|
||||
.append(&proof_req.cht_number)
|
||||
.append(&proof_req.block_number)
|
||||
.append(&proof_req.from_level);
|
||||
}
|
||||
|
||||
stream.out()
|
||||
}
|
||||
Request::TransactionProof(ref request) => {
|
||||
let mut stream = RlpStream::new_list(2);
|
||||
stream.append(&req_id).begin_list(7)
|
||||
.append(&request.at)
|
||||
.append(&request.from);
|
||||
|
||||
match request.action {
|
||||
Action::Create => stream.append_empty_data(),
|
||||
Action::Call(ref to) => stream.append(to),
|
||||
};
|
||||
|
||||
stream
|
||||
.append(&request.gas)
|
||||
.append(&request.gas_price)
|
||||
.append(&request.value)
|
||||
.append(&request.data);
|
||||
|
||||
stream.out()
|
||||
_ => warn!(target: "pip", "received timeout on unknown token {}", timer),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user