Remote transaction execution (#4684)
* return errors on database corruption * fix tests, json tests * fix remainder of build * buffer flow -> request credits * proving state backend * generate transaction proofs from provider * network messages for transaction proof * transaction proof test * test for transaction proof message * fix call bug * request transaction proofs from on_demand * most of proved_execution rpc * proved execution future
This commit is contained in:
committed by
Gav Wood
parent
5bbcf0482b
commit
8a3b5c6332
@@ -19,14 +19,14 @@
|
||||
//! This uses a "Provider" to answer requests.
|
||||
//! See https://github.com/ethcore/parity/wiki/Light-Ethereum-Subprotocol-(LES)
|
||||
|
||||
use ethcore::transaction::UnverifiedTransaction;
|
||||
use ethcore::transaction::{Action, UnverifiedTransaction};
|
||||
use ethcore::receipt::Receipt;
|
||||
|
||||
use io::TimerToken;
|
||||
use network::{NetworkProtocolHandler, NetworkContext, PeerId};
|
||||
use rlp::{RlpStream, Stream, UntrustedRlp, View};
|
||||
use util::hash::H256;
|
||||
use util::{Bytes, Mutex, RwLock, U256};
|
||||
use util::{Bytes, DBValue, Mutex, RwLock, U256};
|
||||
use time::{Duration, SteadyTime};
|
||||
|
||||
use std::collections::HashMap;
|
||||
@@ -37,7 +37,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use provider::Provider;
|
||||
use request::{self, HashOrNumber, Request};
|
||||
|
||||
use self::buffer_flow::{Buffer, FlowParams};
|
||||
use self::request_credits::{Credits, FlowParams};
|
||||
use self::context::{Ctx, TickCtx};
|
||||
use self::error::Punishment;
|
||||
use self::request_set::RequestSet;
|
||||
@@ -51,7 +51,7 @@ mod request_set;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
pub mod buffer_flow;
|
||||
pub mod request_credits;
|
||||
|
||||
pub use self::error::Error;
|
||||
pub use self::context::{BasicContext, EventContext, IoContext};
|
||||
@@ -73,7 +73,7 @@ pub const PROTOCOL_VERSIONS: &'static [u8] = &[1];
|
||||
pub const MAX_PROTOCOL_VERSION: u8 = 1;
|
||||
|
||||
/// Packet count for LES.
|
||||
pub const PACKET_COUNT: u8 = 15;
|
||||
pub const PACKET_COUNT: u8 = 17;
|
||||
|
||||
// packet ID definitions.
|
||||
mod packet {
|
||||
@@ -109,6 +109,10 @@ mod packet {
|
||||
// request and response for header proofs in a CHT.
|
||||
pub const GET_HEADER_PROOFS: u8 = 0x0d;
|
||||
pub const HEADER_PROOFS: u8 = 0x0e;
|
||||
|
||||
// request and response for transaction proof.
|
||||
pub const GET_TRANSACTION_PROOF: u8 = 0x0f;
|
||||
pub const TRANSACTION_PROOF: u8 = 0x10;
|
||||
}
|
||||
|
||||
// timeouts for different kinds of requests. all values are in milliseconds.
|
||||
@@ -121,6 +125,7 @@ mod timeout {
|
||||
pub const PROOFS: i64 = 4000;
|
||||
pub const CONTRACT_CODES: i64 = 5000;
|
||||
pub const HEADER_PROOFS: i64 = 3500;
|
||||
pub const TRANSACTION_PROOF: i64 = 5000;
|
||||
}
|
||||
|
||||
/// A request id.
|
||||
@@ -143,10 +148,10 @@ struct PendingPeer {
|
||||
/// Relevant data to each peer. Not accessible publicly, only `pub` due to
|
||||
/// limitations of the privacy system.
|
||||
pub struct Peer {
|
||||
local_buffer: Buffer, // their buffer relative to us
|
||||
local_credits: Credits, // their credits relative to us
|
||||
status: Status,
|
||||
capabilities: Capabilities,
|
||||
remote_flow: Option<(Buffer, FlowParams)>,
|
||||
remote_flow: Option<(Credits, FlowParams)>,
|
||||
sent_head: H256, // last chain head we've given them.
|
||||
last_update: SteadyTime,
|
||||
pending_requests: RequestSet,
|
||||
@@ -155,21 +160,21 @@ pub struct Peer {
|
||||
|
||||
impl Peer {
|
||||
// check the maximum cost of a request, returning an error if there's
|
||||
// not enough buffer left.
|
||||
// not enough credits left.
|
||||
// returns the calculated maximum cost.
|
||||
fn deduct_max(&mut self, flow_params: &FlowParams, kind: request::Kind, max: usize) -> Result<U256, Error> {
|
||||
flow_params.recharge(&mut self.local_buffer);
|
||||
flow_params.recharge(&mut self.local_credits);
|
||||
|
||||
let max_cost = flow_params.compute_cost(kind, max);
|
||||
self.local_buffer.deduct_cost(max_cost)?;
|
||||
self.local_credits.deduct_cost(max_cost)?;
|
||||
Ok(max_cost)
|
||||
}
|
||||
|
||||
// refund buffer for a request. returns new buffer amount.
|
||||
// refund credits for a request. returns new amount of credits.
|
||||
fn refund(&mut self, flow_params: &FlowParams, amount: U256) -> U256 {
|
||||
flow_params.refund(&mut self.local_buffer, amount);
|
||||
flow_params.refund(&mut self.local_credits, amount);
|
||||
|
||||
self.local_buffer.current()
|
||||
self.local_credits.current()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -206,6 +211,8 @@ pub trait Handler: Send + Sync {
|
||||
/// Called when a peer responds with header proofs. Each proof should be a block header coupled
|
||||
/// with a series of trie nodes is ascending order by distance from the root.
|
||||
fn on_header_proofs(&self, _ctx: &EventContext, _req_id: ReqId, _proofs: &[(Bytes, Vec<Bytes>)]) { }
|
||||
/// Called when a peer responds with a transaction proof. Each proof is a vector of state items.
|
||||
fn on_transaction_proof(&self, _ctx: &EventContext, _req_id: ReqId, _state_items: &[DBValue]) { }
|
||||
/// Called to "tick" the handler periodically.
|
||||
fn tick(&self, _ctx: &BasicContext) { }
|
||||
/// Called on abort. This signals to handlers that they should clean up
|
||||
@@ -218,7 +225,7 @@ pub trait Handler: Send + Sync {
|
||||
pub struct Params {
|
||||
/// Network id.
|
||||
pub network_id: u64,
|
||||
/// Buffer flow parameters.
|
||||
/// Request credits parameters.
|
||||
pub flow_params: FlowParams,
|
||||
/// Initial capabilities.
|
||||
pub capabilities: Capabilities,
|
||||
@@ -334,14 +341,14 @@ impl LightProtocol {
|
||||
|
||||
/// Check the maximum amount of requests of a specific type
|
||||
/// which a peer would be able to serve. Returns zero if the
|
||||
/// peer is unknown or has no buffer flow parameters.
|
||||
/// peer is unknown or has no credit parameters.
|
||||
fn max_requests(&self, peer: PeerId, kind: request::Kind) -> usize {
|
||||
self.peers.read().get(&peer).and_then(|peer| {
|
||||
let mut peer = peer.lock();
|
||||
match peer.remote_flow {
|
||||
Some((ref mut buf, ref flow)) => {
|
||||
flow.recharge(buf);
|
||||
Some(flow.max_amount(&*buf, kind))
|
||||
Some((ref mut c, ref flow)) => {
|
||||
flow.recharge(c);
|
||||
Some(flow.max_amount(&*c, kind))
|
||||
}
|
||||
None => None,
|
||||
}
|
||||
@@ -351,7 +358,7 @@ impl LightProtocol {
|
||||
/// Make a request to a peer.
|
||||
///
|
||||
/// Fails on: nonexistent peer, network error, peer not server,
|
||||
/// insufficient buffer. Does not check capabilities before sending.
|
||||
/// insufficient credits. Does not check capabilities before sending.
|
||||
/// On success, returns a request id which can later be coordinated
|
||||
/// with an event.
|
||||
pub fn request_from(&self, io: &IoContext, peer_id: &PeerId, request: Request) -> Result<ReqId, Error> {
|
||||
@@ -360,10 +367,10 @@ impl LightProtocol {
|
||||
let mut peer = peer.lock();
|
||||
|
||||
match peer.remote_flow {
|
||||
Some((ref mut buf, ref flow)) => {
|
||||
flow.recharge(buf);
|
||||
Some((ref mut c, ref flow)) => {
|
||||
flow.recharge(c);
|
||||
let max = flow.compute_cost(request.kind(), request.amount());
|
||||
buf.deduct_cost(max)?;
|
||||
c.deduct_cost(max)?;
|
||||
}
|
||||
None => return Err(Error::NotServer),
|
||||
}
|
||||
@@ -380,6 +387,7 @@ impl LightProtocol {
|
||||
request::Kind::StateProofs => packet::GET_PROOFS,
|
||||
request::Kind::Codes => packet::GET_CONTRACT_CODES,
|
||||
request::Kind::HeaderProofs => packet::GET_HEADER_PROOFS,
|
||||
request::Kind::TransactionProof => packet::GET_TRANSACTION_PROOF,
|
||||
};
|
||||
|
||||
io.send(*peer_id, packet_id, packet_data);
|
||||
@@ -464,7 +472,7 @@ impl LightProtocol {
|
||||
// - check whether request kinds match
|
||||
fn pre_verify_response(&self, peer: &PeerId, kind: request::Kind, raw: &UntrustedRlp) -> Result<IdGuard, Error> {
|
||||
let req_id = ReqId(raw.val_at(0)?);
|
||||
let cur_buffer: U256 = raw.val_at(1)?;
|
||||
let cur_credits: U256 = raw.val_at(1)?;
|
||||
|
||||
trace!(target: "les", "pre-verifying response from peer {}, kind={:?}", peer, kind);
|
||||
|
||||
@@ -480,9 +488,9 @@ impl LightProtocol {
|
||||
(Some(request), Some(flow_info)) => {
|
||||
had_req = true;
|
||||
|
||||
let &mut (ref mut buf, ref mut flow) = flow_info;
|
||||
let actual_buffer = ::std::cmp::min(cur_buffer, *flow.limit());
|
||||
buf.update_to(actual_buffer);
|
||||
let &mut (ref mut c, ref mut flow) = flow_info;
|
||||
let actual_credits = ::std::cmp::min(cur_credits, *flow.limit());
|
||||
c.update_to(actual_credits);
|
||||
|
||||
if request.kind() != kind {
|
||||
Some(Error::UnsolicitedResponse)
|
||||
@@ -539,6 +547,9 @@ impl LightProtocol {
|
||||
packet::GET_HEADER_PROOFS => self.get_header_proofs(peer, io, rlp),
|
||||
packet::HEADER_PROOFS => self.header_proofs(peer, io, rlp),
|
||||
|
||||
packet::GET_TRANSACTION_PROOF => self.get_transaction_proof(peer, io, rlp),
|
||||
packet::TRANSACTION_PROOF => self.transaction_proof(peer, io, rlp),
|
||||
|
||||
packet::SEND_TRANSACTIONS => self.relay_transactions(peer, io, rlp),
|
||||
|
||||
other => {
|
||||
@@ -685,10 +696,10 @@ impl LightProtocol {
|
||||
return Err(Error::BadProtocolVersion);
|
||||
}
|
||||
|
||||
let remote_flow = flow_params.map(|params| (params.create_buffer(), params));
|
||||
let remote_flow = flow_params.map(|params| (params.create_credits(), params));
|
||||
|
||||
self.peers.write().insert(*peer, Mutex::new(Peer {
|
||||
local_buffer: self.flow_params.create_buffer(),
|
||||
local_credits: self.flow_params.create_credits(),
|
||||
status: status.clone(),
|
||||
capabilities: capabilities.clone(),
|
||||
remote_flow: remote_flow,
|
||||
@@ -793,10 +804,10 @@ impl LightProtocol {
|
||||
let actual_cost = self.flow_params.compute_cost(request::Kind::Headers, response.len());
|
||||
assert!(max_cost >= actual_cost, "Actual cost exceeded maximum computed cost.");
|
||||
|
||||
let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost);
|
||||
let cur_credits = peer.refund(&self.flow_params, max_cost - actual_cost);
|
||||
io.respond(packet::BLOCK_HEADERS, {
|
||||
let mut stream = RlpStream::new_list(3);
|
||||
stream.append(&req_id).append(&cur_buffer).begin_list(response.len());
|
||||
stream.append(&req_id).append(&cur_credits).begin_list(response.len());
|
||||
|
||||
for header in response {
|
||||
stream.append_raw(&header.into_inner(), 1);
|
||||
@@ -855,11 +866,11 @@ impl LightProtocol {
|
||||
let actual_cost = self.flow_params.compute_cost(request::Kind::Bodies, response_len);
|
||||
assert!(max_cost >= actual_cost, "Actual cost exceeded maximum computed cost.");
|
||||
|
||||
let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost);
|
||||
let cur_credits = peer.refund(&self.flow_params, max_cost - actual_cost);
|
||||
|
||||
io.respond(packet::BLOCK_BODIES, {
|
||||
let mut stream = RlpStream::new_list(3);
|
||||
stream.append(&req_id).append(&cur_buffer).begin_list(response.len());
|
||||
stream.append(&req_id).append(&cur_credits).begin_list(response.len());
|
||||
|
||||
for body in response {
|
||||
match body {
|
||||
@@ -921,11 +932,11 @@ impl LightProtocol {
|
||||
let actual_cost = self.flow_params.compute_cost(request::Kind::Receipts, response_len);
|
||||
assert!(max_cost >= actual_cost, "Actual cost exceeded maximum computed cost.");
|
||||
|
||||
let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost);
|
||||
let cur_credits = peer.refund(&self.flow_params, max_cost - actual_cost);
|
||||
|
||||
io.respond(packet::RECEIPTS, {
|
||||
let mut stream = RlpStream::new_list(3);
|
||||
stream.append(&req_id).append(&cur_buffer).begin_list(response.len());
|
||||
stream.append(&req_id).append(&cur_credits).begin_list(response.len());
|
||||
|
||||
for receipts in response {
|
||||
stream.append_raw(&receipts, 1);
|
||||
@@ -995,11 +1006,11 @@ impl LightProtocol {
|
||||
let actual_cost = self.flow_params.compute_cost(request::Kind::StateProofs, response_len);
|
||||
assert!(max_cost >= actual_cost, "Actual cost exceeded maximum computed cost.");
|
||||
|
||||
let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost);
|
||||
let cur_credits = peer.refund(&self.flow_params, max_cost - actual_cost);
|
||||
|
||||
io.respond(packet::PROOFS, {
|
||||
let mut stream = RlpStream::new_list(3);
|
||||
stream.append(&req_id).append(&cur_buffer).begin_list(response.len());
|
||||
stream.append(&req_id).append(&cur_credits).begin_list(response.len());
|
||||
|
||||
for proof in response {
|
||||
stream.append_raw(&proof, 1);
|
||||
@@ -1067,11 +1078,11 @@ impl LightProtocol {
|
||||
let actual_cost = self.flow_params.compute_cost(request::Kind::Codes, response_len);
|
||||
assert!(max_cost >= actual_cost, "Actual cost exceeded maximum computed cost.");
|
||||
|
||||
let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost);
|
||||
let cur_credits = peer.refund(&self.flow_params, max_cost - actual_cost);
|
||||
|
||||
io.respond(packet::CONTRACT_CODES, {
|
||||
let mut stream = RlpStream::new_list(3);
|
||||
stream.append(&req_id).append(&cur_buffer).begin_list(response.len());
|
||||
stream.append(&req_id).append(&cur_credits).begin_list(response.len());
|
||||
|
||||
for code in response {
|
||||
stream.append(&code);
|
||||
@@ -1140,11 +1151,11 @@ impl LightProtocol {
|
||||
let actual_cost = self.flow_params.compute_cost(request::Kind::HeaderProofs, response_len);
|
||||
assert!(max_cost >= actual_cost, "Actual cost exceeded maximum computed cost.");
|
||||
|
||||
let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost);
|
||||
let cur_credits = peer.refund(&self.flow_params, max_cost - actual_cost);
|
||||
|
||||
io.respond(packet::HEADER_PROOFS, {
|
||||
let mut stream = RlpStream::new_list(3);
|
||||
stream.append(&req_id).append(&cur_buffer).begin_list(response.len());
|
||||
stream.append(&req_id).append(&cur_credits).begin_list(response.len());
|
||||
|
||||
for proof in response {
|
||||
stream.append_raw(&proof, 1);
|
||||
@@ -1182,6 +1193,90 @@ impl LightProtocol {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Receive a request for proof-of-execution.
|
||||
fn get_transaction_proof(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
|
||||
// refuse to execute more than this amount of gas at once.
|
||||
// this is appx. the point at which the proof of execution would no longer fit in
|
||||
// a single Devp2p packet.
|
||||
const MAX_GAS: usize = 50_000_000;
|
||||
use util::Uint;
|
||||
|
||||
let peers = self.peers.read();
|
||||
let peer = match peers.get(peer) {
|
||||
Some(peer) => peer,
|
||||
None => {
|
||||
debug!(target: "les", "Ignoring request from unknown peer");
|
||||
return Ok(())
|
||||
}
|
||||
};
|
||||
let mut peer = peer.lock();
|
||||
|
||||
let req_id: u64 = raw.val_at(0)?;
|
||||
|
||||
let req = {
|
||||
let req_rlp = raw.at(1)?;
|
||||
request::TransactionProof {
|
||||
at: req_rlp.val_at(0)?,
|
||||
from: req_rlp.val_at(1)?,
|
||||
action: if req_rlp.at(2)?.is_empty() {
|
||||
Action::Create
|
||||
} else {
|
||||
Action::Call(req_rlp.val_at(2)?)
|
||||
},
|
||||
gas: ::std::cmp::min(req_rlp.val_at(3)?, MAX_GAS.into()),
|
||||
gas_price: req_rlp.val_at(4)?,
|
||||
value: req_rlp.val_at(5)?,
|
||||
data: req_rlp.val_at(6)?,
|
||||
}
|
||||
};
|
||||
|
||||
// always charge the peer for all the gas.
|
||||
peer.deduct_max(&self.flow_params, request::Kind::TransactionProof, req.gas.low_u64() as usize)?;
|
||||
|
||||
let response = match self.provider.transaction_proof(req) {
|
||||
Some(res) => res,
|
||||
None => vec![],
|
||||
};
|
||||
|
||||
let cur_credits = peer.local_credits.current();
|
||||
|
||||
io.respond(packet::TRANSACTION_PROOF, {
|
||||
let mut stream = RlpStream::new_list(3);
|
||||
stream.append(&req_id).append(&cur_credits).begin_list(response.len());
|
||||
|
||||
for state_item in response {
|
||||
stream.append(&&state_item[..]);
|
||||
}
|
||||
|
||||
stream.out()
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Receive a response for proof-of-execution.
|
||||
fn transaction_proof(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
|
||||
let id_guard = self.pre_verify_response(peer, request::Kind::HeaderProofs, &raw)?;
|
||||
let raw_proof: Vec<DBValue> = raw.at(2)?.iter()
|
||||
.map(|rlp| {
|
||||
let mut db_val = DBValue::new();
|
||||
db_val.append_slice(rlp.data()?);
|
||||
Ok(db_val)
|
||||
})
|
||||
.collect::<Result<Vec<_>, ::rlp::DecoderError>>()?;
|
||||
|
||||
let req_id = id_guard.defuse();
|
||||
for handler in &self.handlers {
|
||||
handler.on_transaction_proof(&Ctx {
|
||||
peer: *peer,
|
||||
io: io,
|
||||
proto: self,
|
||||
}, req_id, &raw_proof);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Receive a set of transactions to relay.
|
||||
fn relay_transactions(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> {
|
||||
const MAX_TRANSACTIONS: usize = 256;
|
||||
@@ -1330,6 +1425,25 @@ fn encode_request(req: &Request, req_id: usize) -> Vec<u8> {
|
||||
.append(&proof_req.from_level);
|
||||
}
|
||||
|
||||
stream.out()
|
||||
}
|
||||
Request::TransactionProof(ref request) => {
|
||||
let mut stream = RlpStream::new_list(2);
|
||||
stream.append(&req_id).begin_list(7)
|
||||
.append(&request.at)
|
||||
.append(&request.from);
|
||||
|
||||
match request.action {
|
||||
Action::Create => stream.append_empty_data(),
|
||||
Action::Call(ref to) => stream.append(to),
|
||||
};
|
||||
|
||||
stream
|
||||
.append(&request.gas)
|
||||
.append(&request.gas_price)
|
||||
.append(&request.value)
|
||||
.append(&request.data);
|
||||
|
||||
stream.out()
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user