buffer flow basics, implement cost table

This commit is contained in:
Robert Habermeier 2016-11-07 15:40:34 +01:00
parent d573ef3cc2
commit 051effe9f8
5 changed files with 258 additions and 97 deletions

View File

@ -12,4 +12,5 @@ ethcore = { path = ".." }
ethcore-util = { path = "../../util" }
ethcore-network = { path = "../../util/network" }
ethcore-io = { path = "../../util/io" }
rlp = { path = "../../util/rlp" }
rlp = { path = "../../util/rlp" }
time = "0.1"

View File

@ -38,6 +38,7 @@ extern crate ethcore_network as network;
extern crate ethcore_io as io;
extern crate ethcore;
extern crate rlp;
extern crate time;
#[macro_use]
extern crate log;

View File

@ -24,19 +24,245 @@
//! flow costs and recharge rates.
use request::{self, Request};
use super::packet;
/// Manages buffer flow costs for specific requests.
pub struct FlowManager;
use rlp::*;
use util::U256;
use time::{Duration, SteadyTime};
impl FlowManager {
/// Estimate the maximum cost of this request.
pub fn estimate_cost(&self, req: &request::Request) -> usize {
unimplemented!()
/// A request cost specification.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Cost(pub U256, pub U256);
/// An error: insufficient buffer.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct InsufficientBuffer;
/// Buffer value.
///
/// Produced and recharged using `FlowParams`.
/// Definitive updates can be made as well -- these will reset the recharge
/// point to the time of the update.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Buffer {
estimate: U256,
recharge_point: SteadyTime,
}
impl Buffer {
/// Make a definitive update.
/// This will be the value obtained after receiving
/// a response to a request.
pub fn update_to(&mut self, value: U256) {
self.estimate = value;
self.recharge_point = SteadyTime::now();
}
/// Get an exact cost based on request kind and amount of requests fulfilled.
pub fn exact_cost(&self, kind: request::Kind, amount: usize) -> usize {
unimplemented!()
/// Attempt to apply the given cost to the buffer.
/// If successful, the cost will be deducted successfully.
/// If unsuccessful, the structure will be unaltered an an
/// error will be produced.
pub fn deduct_cost(&mut self, cost: U256) -> Result<(), InsufficientBuffer> {
match cost > self.estimate {
true => Err(InsufficientBuffer),
false => {
self.estimate = self.estimate - cost;
Ok(())
}
}
}
}
/// A cost table, mapping requests to base and per-request costs.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CostTable {
headers: Cost,
bodies: Cost,
receipts: Cost,
state_proofs: Cost,
contract_codes: Cost,
header_proofs: Cost,
}
impl Default for CostTable {
fn default() -> Self {
// arbitrarily chosen constants.
CostTable {
headers: Cost(100000.into(), 10000.into()),
bodies: Cost(150000.into(), 15000.into()),
receipts: Cost(50000.into(), 5000.into()),
state_proofs: Cost(250000.into(), 25000.into()),
contract_codes: Cost(200000.into(), 20000.into()),
header_proofs: Cost(150000.into(), 15000.into()),
}
}
}
impl RlpEncodable for CostTable {
fn rlp_append(&self, s: &mut RlpStream) {
fn append_cost(s: &mut RlpStream, msg_id: u8, cost: &Cost) {
s.begin_list(3)
.append(&msg_id)
.append(&cost.0)
.append(&cost.1);
}
s.begin_list(6);
append_cost(s, packet::GET_BLOCK_HEADERS, &self.headers);
append_cost(s, packet::GET_BLOCK_BODIES, &self.bodies);
append_cost(s, packet::GET_RECEIPTS, &self.receipts);
append_cost(s, packet::GET_PROOFS, &self.state_proofs);
append_cost(s, packet::GET_CONTRACT_CODES, &self.contract_codes);
append_cost(s, packet::GET_HEADER_PROOFS, &self.header_proofs);
}
}
impl RlpDecodable for CostTable {
fn decode<D>(decoder: &D) -> Result<Self, DecoderError> where D: Decoder {
let rlp = decoder.as_rlp();
let mut headers = None;
let mut bodies = None;
let mut receipts = None;
let mut state_proofs = None;
let mut contract_codes = None;
let mut header_proofs = None;
for row in rlp.iter() {
let msg_id: u8 = try!(row.val_at(0));
let cost = {
let base = try!(row.val_at(1));
let per = try!(row.val_at(2));
Cost(base, per)
};
match msg_id {
packet::GET_BLOCK_HEADERS => headers = Some(cost),
packet::GET_BLOCK_BODIES => bodies = Some(cost),
packet::GET_RECEIPTS => receipts = Some(cost),
packet::GET_PROOFS => state_proofs = Some(cost),
packet::GET_CONTRACT_CODES => contract_codes = Some(cost),
packet::GET_HEADER_PROOFS => header_proofs = Some(cost),
_ => return Err(DecoderError::Custom("Unrecognized message in cost table")),
}
}
Ok(CostTable {
headers: try!(headers.ok_or(DecoderError::Custom("No headers cost specified"))),
bodies: try!(bodies.ok_or(DecoderError::Custom("No bodies cost specified"))),
receipts: try!(receipts.ok_or(DecoderError::Custom("No receipts cost specified"))),
state_proofs: try!(state_proofs.ok_or(DecoderError::Custom("No proofs cost specified"))),
contract_codes: try!(contract_codes.ok_or(DecoderError::Custom("No contract codes specified"))),
header_proofs: try!(header_proofs.ok_or(DecoderError::Custom("No header proofs cost specified"))),
})
}
}
/// A buffer-flow manager handles costs, recharge, limits
#[derive(Debug, Clone, PartialEq)]
pub struct FlowParams {
costs: CostTable,
limit: U256,
recharge: U256,
}
impl FlowParams {
/// Create new flow parameters from a request cost table,
/// buffer limit, and (minimum) rate of recharge.
pub fn new(costs: CostTable, limit: U256, recharge: U256) -> Self {
FlowParams {
costs: costs,
limit: limit,
recharge: recharge,
}
}
/// Estimate the maximum cost of the request.
pub fn max_cost(&self, req: &Request) -> U256 {
let amount = match *req {
Request::Headers(ref req) => req.max as usize,
Request::Bodies(ref req) => req.block_hashes.len(),
Request::Receipts(ref req) => req.block_hashes.len(),
Request::StateProofs(ref req) => req.requests.len(),
Request::Codes(ref req) => req.code_requests.len(),
Request::HeaderProofs(ref req) => req.requests.len(),
};
self.actual_cost(req.kind(), amount)
}
/// Compute the actual cost of a request, given the kind of request
/// and number of requests made.
pub fn actual_cost(&self, kind: request::Kind, amount: usize) -> U256 {
let cost = match kind {
request::Kind::Headers => &self.costs.headers,
request::Kind::Bodies => &self.costs.bodies,
request::Kind::Receipts => &self.costs.receipts,
request::Kind::StateProofs => &self.costs.state_proofs,
request::Kind::Codes => &self.costs.contract_codes,
request::Kind::HeaderProofs => &self.costs.header_proofs,
};
let amount: U256 = amount.into();
cost.0 + (amount * cost.1)
}
/// Create initial buffer parameter.
pub fn create_buffer(&self) -> Buffer {
Buffer {
estimate: self.limit,
recharge_point: SteadyTime::now(),
}
}
/// Recharge the buffer based on time passed since last
/// update.
pub fn recharge(&self, buf: &mut Buffer) {
let now = SteadyTime::now();
// recompute and update only in terms of full seconds elapsed
// in order to keep the estimate as an underestimate.
let elapsed = (now - buf.recharge_point).num_seconds();
buf.recharge_point = buf.recharge_point + Duration::seconds(elapsed);
let elapsed: U256 = elapsed.into();
buf.estimate = ::std::cmp::min(self.limit, buf.estimate + (elapsed * self.recharge));
}
}
#[cfg(test)]
mod tests {
use super::*;
use util::U256;
#[test]
fn should_serialize_cost_table() {
let costs = CostTable::default();
let serialized = ::rlp::encode(&costs);
let new_costs: CostTable = ::rlp::decode(&*serialized);
assert_eq!(costs, new_costs);
}
#[test]
fn buffer_mechanism() {
use std::thread;
use std::time::Duration;
let flow_params = FlowParams::new(Default::default(), 100.into(), 20.into());
let mut buffer = flow_params.create_buffer();
assert!(buffer.deduct_cost(101.into()).is_err());
assert!(buffer.deduct_cost(10.into()).is_ok());
thread::sleep(Duration::from_secs(1));
flow_params.recharge(&mut buffer);
assert_eq!(buffer.estimate, 100.into());
}
}

View File

@ -30,7 +30,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use provider::Provider;
use request::{self, Request};
use self::buffer_flow::FlowManager;
use self::buffer_flow::FlowParams;
mod buffer_flow;
@ -77,33 +77,6 @@ mod packet {
// request and response for header proofs in a CHT.
pub const GET_HEADER_PROOFS: u8 = 0x0d;
pub const HEADER_PROOFS: u8 = 0x0e;
// broadcast dynamic capabilities.
pub const CAPABILITIES: u8 = 0x0f;
// request and response for block-level state deltas.
pub const GET_BLOCK_DELTAS: u8 = 0x10;
pub const BLOCK_DELTAS: u8 = 0x11;
// request and response for transaction proofs.
pub const GET_TRANSACTION_PROOFS: u8 = 0x12;
pub const TRANSACTION_PROOFS: u8 = 0x13;
}
// helper macro for disconnecting peer on error while returning
// the value if ok.
// requires that error types are debug.
macro_rules! try_dc {
($io: expr, $peer: expr, $e: expr) => {
match $e {
Ok(x) => x,
Err(e) => {
debug!(target: "les", "disconnecting peer {} due to error {:?}", $peer, e);
$io.disconnect_peer($peer);
return;
}
}
}
}
struct Requested {
@ -209,26 +182,7 @@ impl LightProtocol {
fn get_block_headers(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) {
const MAX_HEADERS: u64 = 512;
let req_id: u64 = try_dc!(io, *peer, data.val_at(0));
let req = request::Headers {
block: try_dc!(io, *peer, data.at(1).and_then(|block_list| {
Ok((try!(block_list.val_at(0)), try!(block_list.val_at(1))))
})),
max: ::std::cmp::min(MAX_HEADERS, try_dc!(io, *peer, data.val_at(2))),
skip: try_dc!(io, *peer, data.val_at(3)),
reverse: try_dc!(io, *peer, data.val_at(4)),
};
let res = self.provider.block_headers(req);
let mut res_stream = RlpStream::new_list(2 + res.len());
res_stream.append(&req_id);
res_stream.append(&0u64); // TODO: Buffer Flow.
for raw_header in res {
res_stream.append_raw(&raw_header, 1);
}
try_dc!(io, *peer, io.respond(packet::BLOCK_HEADERS, res_stream.out()))
unimplemented!()
}
// Receive a response for block headers.
@ -292,31 +246,6 @@ impl LightProtocol {
fn relay_transactions(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) {
unimplemented!()
}
// Receive updated capabilities from a peer.
fn capabilities(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) {
unimplemented!()
}
// Handle a request for block deltas.
fn get_block_deltas(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) {
unimplemented!()
}
// Receive block deltas.
fn block_deltas(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) {
unimplemented!()
}
// Handle a request for transaction proofs.
fn get_transaction_proofs(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) {
unimplemented!()
}
// Receive transaction proofs.
fn transaction_proofs(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) {
unimplemented!()
}
}
impl NetworkProtocolHandler for LightProtocol {
@ -346,16 +275,6 @@ impl NetworkProtocolHandler for LightProtocol {
packet::CONTRACT_CODES => self.contract_code(peer, io, rlp),
packet::SEND_TRANSACTIONS => self.relay_transactions(peer, io, rlp),
packet::CAPABILITIES => self.capabilities(peer, io, rlp),
packet::GET_HEADER_PROOFS => self.get_header_proofs(peer, io, rlp),
packet::HEADER_PROOFS => self.header_proofs(peer, io, rlp),
packet::GET_BLOCK_DELTAS => self.get_block_deltas(peer, io, rlp),
packet::BLOCK_DELTAS => self.block_deltas(peer, io, rlp),
packet::GET_TRANSACTION_PROOFS => self.get_transaction_proofs(peer, io, rlp),
packet::TRANSACTION_PROOFS => self.transaction_proofs(peer, io, rlp),
other => {
debug!(target: "les", "Disconnecting peer {} on unexpected packet {}", peer, other);

View File

@ -51,9 +51,9 @@ pub struct Receipts {
pub block_hashes: Vec<H256>,
}
/// A request for state proofs.
/// A request for a state proof
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StateProofs {
pub struct StateProof {
/// Block hash to query state from.
pub block: H256,
/// Key of the state trie -- corresponds to account hash.
@ -65,6 +65,13 @@ pub struct StateProofs {
pub from_level: u32, // could even safely be u8; trie w/ 32-byte key can be at most 64-levels deep.
}
/// A request for state proofs.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StateProofs {
/// All the proof requests.
pub requests: Vec<StateProof>,
}
/// A request for contract code.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ContractCodes {
@ -72,9 +79,9 @@ pub struct ContractCodes {
pub code_requests: Vec<(H256, H256)>,
}
/// A request for header proofs from the Canonical Hash Trie.
/// A request for a header proof from the Canonical Hash Trie.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct HeaderProofs {
pub struct HeaderProof {
/// Number of the CHT.
pub cht_number: u64,
/// Block number requested.
@ -83,6 +90,13 @@ pub struct HeaderProofs {
pub from_level: u32,
}
/// A request for header proofs from the CHT.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct HeaderProofs {
/// All the proof requests.
pub requests: Vec<HeaderProofs>,
}
/// Kinds of requests.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Kind {