From 051effe9f8ceca371eaf4ca7c9c7898c956d211a Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 7 Nov 2016 15:40:34 +0100 Subject: [PATCH] buffer flow basics, implement cost table --- ethcore/light/Cargo.toml | 3 +- ethcore/light/src/lib.rs | 1 + ethcore/light/src/net/buffer_flow.rs | 244 ++++++++++++++++++++++++++- ethcore/light/src/net/mod.rs | 85 +--------- ethcore/light/src/request.rs | 22 ++- 5 files changed, 258 insertions(+), 97 deletions(-) diff --git a/ethcore/light/Cargo.toml b/ethcore/light/Cargo.toml index dcfb4760a..daf141de7 100644 --- a/ethcore/light/Cargo.toml +++ b/ethcore/light/Cargo.toml @@ -12,4 +12,5 @@ ethcore = { path = ".." } ethcore-util = { path = "../../util" } ethcore-network = { path = "../../util/network" } ethcore-io = { path = "../../util/io" } -rlp = { path = "../../util/rlp" } \ No newline at end of file +rlp = { path = "../../util/rlp" } +time = "0.1" \ No newline at end of file diff --git a/ethcore/light/src/lib.rs b/ethcore/light/src/lib.rs index 63586f5a2..f6d05c65c 100644 --- a/ethcore/light/src/lib.rs +++ b/ethcore/light/src/lib.rs @@ -38,6 +38,7 @@ extern crate ethcore_network as network; extern crate ethcore_io as io; extern crate ethcore; extern crate rlp; +extern crate time; #[macro_use] extern crate log; \ No newline at end of file diff --git a/ethcore/light/src/net/buffer_flow.rs b/ethcore/light/src/net/buffer_flow.rs index ece9f7057..deed1cded 100644 --- a/ethcore/light/src/net/buffer_flow.rs +++ b/ethcore/light/src/net/buffer_flow.rs @@ -24,19 +24,245 @@ //! flow costs and recharge rates. use request::{self, Request}; +use super::packet; -/// Manages buffer flow costs for specific requests. -pub struct FlowManager; +use rlp::*; +use util::U256; +use time::{Duration, SteadyTime}; -impl FlowManager { - /// Estimate the maximum cost of this request. - pub fn estimate_cost(&self, req: &request::Request) -> usize { - unimplemented!() +/// A request cost specification. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Cost(pub U256, pub U256); + +/// An error: insufficient buffer. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct InsufficientBuffer; + +/// Buffer value. +/// +/// Produced and recharged using `FlowParams`. +/// Definitive updates can be made as well -- these will reset the recharge +/// point to the time of the update. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Buffer { + estimate: U256, + recharge_point: SteadyTime, +} + +impl Buffer { + /// Make a definitive update. + /// This will be the value obtained after receiving + /// a response to a request. + pub fn update_to(&mut self, value: U256) { + self.estimate = value; + self.recharge_point = SteadyTime::now(); } - /// Get an exact cost based on request kind and amount of requests fulfilled. - pub fn exact_cost(&self, kind: request::Kind, amount: usize) -> usize { - unimplemented!() + /// Attempt to apply the given cost to the buffer. + /// If successful, the cost will be deducted successfully. + /// If unsuccessful, the structure will be unaltered an an + /// error will be produced. + pub fn deduct_cost(&mut self, cost: U256) -> Result<(), InsufficientBuffer> { + match cost > self.estimate { + true => Err(InsufficientBuffer), + false => { + self.estimate = self.estimate - cost; + Ok(()) + } + } } } +/// A cost table, mapping requests to base and per-request costs. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CostTable { + headers: Cost, + bodies: Cost, + receipts: Cost, + state_proofs: Cost, + contract_codes: Cost, + header_proofs: Cost, +} + +impl Default for CostTable { + fn default() -> Self { + // arbitrarily chosen constants. + CostTable { + headers: Cost(100000.into(), 10000.into()), + bodies: Cost(150000.into(), 15000.into()), + receipts: Cost(50000.into(), 5000.into()), + state_proofs: Cost(250000.into(), 25000.into()), + contract_codes: Cost(200000.into(), 20000.into()), + header_proofs: Cost(150000.into(), 15000.into()), + } + } +} + +impl RlpEncodable for CostTable { + fn rlp_append(&self, s: &mut RlpStream) { + fn append_cost(s: &mut RlpStream, msg_id: u8, cost: &Cost) { + s.begin_list(3) + .append(&msg_id) + .append(&cost.0) + .append(&cost.1); + } + + s.begin_list(6); + + append_cost(s, packet::GET_BLOCK_HEADERS, &self.headers); + append_cost(s, packet::GET_BLOCK_BODIES, &self.bodies); + append_cost(s, packet::GET_RECEIPTS, &self.receipts); + append_cost(s, packet::GET_PROOFS, &self.state_proofs); + append_cost(s, packet::GET_CONTRACT_CODES, &self.contract_codes); + append_cost(s, packet::GET_HEADER_PROOFS, &self.header_proofs); + } +} + +impl RlpDecodable for CostTable { + fn decode(decoder: &D) -> Result where D: Decoder { + let rlp = decoder.as_rlp(); + + let mut headers = None; + let mut bodies = None; + let mut receipts = None; + let mut state_proofs = None; + let mut contract_codes = None; + let mut header_proofs = None; + + for row in rlp.iter() { + let msg_id: u8 = try!(row.val_at(0)); + let cost = { + let base = try!(row.val_at(1)); + let per = try!(row.val_at(2)); + + Cost(base, per) + }; + + match msg_id { + packet::GET_BLOCK_HEADERS => headers = Some(cost), + packet::GET_BLOCK_BODIES => bodies = Some(cost), + packet::GET_RECEIPTS => receipts = Some(cost), + packet::GET_PROOFS => state_proofs = Some(cost), + packet::GET_CONTRACT_CODES => contract_codes = Some(cost), + packet::GET_HEADER_PROOFS => header_proofs = Some(cost), + _ => return Err(DecoderError::Custom("Unrecognized message in cost table")), + } + } + + Ok(CostTable { + headers: try!(headers.ok_or(DecoderError::Custom("No headers cost specified"))), + bodies: try!(bodies.ok_or(DecoderError::Custom("No bodies cost specified"))), + receipts: try!(receipts.ok_or(DecoderError::Custom("No receipts cost specified"))), + state_proofs: try!(state_proofs.ok_or(DecoderError::Custom("No proofs cost specified"))), + contract_codes: try!(contract_codes.ok_or(DecoderError::Custom("No contract codes specified"))), + header_proofs: try!(header_proofs.ok_or(DecoderError::Custom("No header proofs cost specified"))), + }) + } +} + +/// A buffer-flow manager handles costs, recharge, limits +#[derive(Debug, Clone, PartialEq)] +pub struct FlowParams { + costs: CostTable, + limit: U256, + recharge: U256, +} + +impl FlowParams { + /// Create new flow parameters from a request cost table, + /// buffer limit, and (minimum) rate of recharge. + pub fn new(costs: CostTable, limit: U256, recharge: U256) -> Self { + FlowParams { + costs: costs, + limit: limit, + recharge: recharge, + } + } + + /// Estimate the maximum cost of the request. + pub fn max_cost(&self, req: &Request) -> U256 { + let amount = match *req { + Request::Headers(ref req) => req.max as usize, + Request::Bodies(ref req) => req.block_hashes.len(), + Request::Receipts(ref req) => req.block_hashes.len(), + Request::StateProofs(ref req) => req.requests.len(), + Request::Codes(ref req) => req.code_requests.len(), + Request::HeaderProofs(ref req) => req.requests.len(), + }; + + self.actual_cost(req.kind(), amount) + } + + /// Compute the actual cost of a request, given the kind of request + /// and number of requests made. + pub fn actual_cost(&self, kind: request::Kind, amount: usize) -> U256 { + let cost = match kind { + request::Kind::Headers => &self.costs.headers, + request::Kind::Bodies => &self.costs.bodies, + request::Kind::Receipts => &self.costs.receipts, + request::Kind::StateProofs => &self.costs.state_proofs, + request::Kind::Codes => &self.costs.contract_codes, + request::Kind::HeaderProofs => &self.costs.header_proofs, + }; + + let amount: U256 = amount.into(); + cost.0 + (amount * cost.1) + } + + /// Create initial buffer parameter. + pub fn create_buffer(&self) -> Buffer { + Buffer { + estimate: self.limit, + recharge_point: SteadyTime::now(), + } + } + + /// Recharge the buffer based on time passed since last + /// update. + pub fn recharge(&self, buf: &mut Buffer) { + let now = SteadyTime::now(); + + // recompute and update only in terms of full seconds elapsed + // in order to keep the estimate as an underestimate. + let elapsed = (now - buf.recharge_point).num_seconds(); + buf.recharge_point = buf.recharge_point + Duration::seconds(elapsed); + + let elapsed: U256 = elapsed.into(); + + buf.estimate = ::std::cmp::min(self.limit, buf.estimate + (elapsed * self.recharge)); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use util::U256; + + #[test] + fn should_serialize_cost_table() { + let costs = CostTable::default(); + let serialized = ::rlp::encode(&costs); + + let new_costs: CostTable = ::rlp::decode(&*serialized); + + assert_eq!(costs, new_costs); + } + + #[test] + fn buffer_mechanism() { + use std::thread; + use std::time::Duration; + + let flow_params = FlowParams::new(Default::default(), 100.into(), 20.into()); + let mut buffer = flow_params.create_buffer(); + + assert!(buffer.deduct_cost(101.into()).is_err()); + assert!(buffer.deduct_cost(10.into()).is_ok()); + + thread::sleep(Duration::from_secs(1)); + + flow_params.recharge(&mut buffer); + + assert_eq!(buffer.estimate, 100.into()); + } +} \ No newline at end of file diff --git a/ethcore/light/src/net/mod.rs b/ethcore/light/src/net/mod.rs index 9b707db2f..34fe8acf2 100644 --- a/ethcore/light/src/net/mod.rs +++ b/ethcore/light/src/net/mod.rs @@ -30,7 +30,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use provider::Provider; use request::{self, Request}; -use self::buffer_flow::FlowManager; +use self::buffer_flow::FlowParams; mod buffer_flow; @@ -77,33 +77,6 @@ mod packet { // request and response for header proofs in a CHT. pub const GET_HEADER_PROOFS: u8 = 0x0d; pub const HEADER_PROOFS: u8 = 0x0e; - - // broadcast dynamic capabilities. - pub const CAPABILITIES: u8 = 0x0f; - - // request and response for block-level state deltas. - pub const GET_BLOCK_DELTAS: u8 = 0x10; - pub const BLOCK_DELTAS: u8 = 0x11; - - // request and response for transaction proofs. - pub const GET_TRANSACTION_PROOFS: u8 = 0x12; - pub const TRANSACTION_PROOFS: u8 = 0x13; -} - -// helper macro for disconnecting peer on error while returning -// the value if ok. -// requires that error types are debug. -macro_rules! try_dc { - ($io: expr, $peer: expr, $e: expr) => { - match $e { - Ok(x) => x, - Err(e) => { - debug!(target: "les", "disconnecting peer {} due to error {:?}", $peer, e); - $io.disconnect_peer($peer); - return; - } - } - } } struct Requested { @@ -209,26 +182,7 @@ impl LightProtocol { fn get_block_headers(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) { const MAX_HEADERS: u64 = 512; - let req_id: u64 = try_dc!(io, *peer, data.val_at(0)); - let req = request::Headers { - block: try_dc!(io, *peer, data.at(1).and_then(|block_list| { - Ok((try!(block_list.val_at(0)), try!(block_list.val_at(1)))) - })), - max: ::std::cmp::min(MAX_HEADERS, try_dc!(io, *peer, data.val_at(2))), - skip: try_dc!(io, *peer, data.val_at(3)), - reverse: try_dc!(io, *peer, data.val_at(4)), - }; - - let res = self.provider.block_headers(req); - - let mut res_stream = RlpStream::new_list(2 + res.len()); - res_stream.append(&req_id); - res_stream.append(&0u64); // TODO: Buffer Flow. - for raw_header in res { - res_stream.append_raw(&raw_header, 1); - } - - try_dc!(io, *peer, io.respond(packet::BLOCK_HEADERS, res_stream.out())) + unimplemented!() } // Receive a response for block headers. @@ -292,31 +246,6 @@ impl LightProtocol { fn relay_transactions(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) { unimplemented!() } - - // Receive updated capabilities from a peer. - fn capabilities(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) { - unimplemented!() - } - - // Handle a request for block deltas. - fn get_block_deltas(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) { - unimplemented!() - } - - // Receive block deltas. - fn block_deltas(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) { - unimplemented!() - } - - // Handle a request for transaction proofs. - fn get_transaction_proofs(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) { - unimplemented!() - } - - // Receive transaction proofs. - fn transaction_proofs(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) { - unimplemented!() - } } impl NetworkProtocolHandler for LightProtocol { @@ -346,16 +275,6 @@ impl NetworkProtocolHandler for LightProtocol { packet::CONTRACT_CODES => self.contract_code(peer, io, rlp), packet::SEND_TRANSACTIONS => self.relay_transactions(peer, io, rlp), - packet::CAPABILITIES => self.capabilities(peer, io, rlp), - - packet::GET_HEADER_PROOFS => self.get_header_proofs(peer, io, rlp), - packet::HEADER_PROOFS => self.header_proofs(peer, io, rlp), - - packet::GET_BLOCK_DELTAS => self.get_block_deltas(peer, io, rlp), - packet::BLOCK_DELTAS => self.block_deltas(peer, io, rlp), - - packet::GET_TRANSACTION_PROOFS => self.get_transaction_proofs(peer, io, rlp), - packet::TRANSACTION_PROOFS => self.transaction_proofs(peer, io, rlp), other => { debug!(target: "les", "Disconnecting peer {} on unexpected packet {}", peer, other); diff --git a/ethcore/light/src/request.rs b/ethcore/light/src/request.rs index f5c594970..11b474ee5 100644 --- a/ethcore/light/src/request.rs +++ b/ethcore/light/src/request.rs @@ -51,9 +51,9 @@ pub struct Receipts { pub block_hashes: Vec, } -/// A request for state proofs. +/// A request for a state proof #[derive(Debug, Clone, PartialEq, Eq)] -pub struct StateProofs { +pub struct StateProof { /// Block hash to query state from. pub block: H256, /// Key of the state trie -- corresponds to account hash. @@ -65,6 +65,13 @@ pub struct StateProofs { pub from_level: u32, // could even safely be u8; trie w/ 32-byte key can be at most 64-levels deep. } +/// A request for state proofs. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct StateProofs { + /// All the proof requests. + pub requests: Vec, +} + /// A request for contract code. #[derive(Debug, Clone, PartialEq, Eq)] pub struct ContractCodes { @@ -72,9 +79,9 @@ pub struct ContractCodes { pub code_requests: Vec<(H256, H256)>, } -/// A request for header proofs from the Canonical Hash Trie. +/// A request for a header proof from the Canonical Hash Trie. #[derive(Debug, Clone, PartialEq, Eq)] -pub struct HeaderProofs { +pub struct HeaderProof { /// Number of the CHT. pub cht_number: u64, /// Block number requested. @@ -83,6 +90,13 @@ pub struct HeaderProofs { pub from_level: u32, } +/// A request for header proofs from the CHT. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct HeaderProofs { + /// All the proof requests. + pub requests: Vec, +} + /// Kinds of requests. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum Kind {