buffer flow -> request credits

This commit is contained in:
Robert Habermeier 2017-02-23 23:10:29 +01:00
parent f169c8dbb0
commit ddbdfafc05
6 changed files with 97 additions and 97 deletions

View File

@ -44,8 +44,8 @@ pub enum Error {
Rlp(DecoderError),
/// A network error.
Network(NetworkError),
/// Out of buffer.
BufferEmpty,
/// Out of credits.
NoCredits,
/// Unrecognized packet code.
UnrecognizedPacket(u8),
/// Unexpected handshake.
@ -72,7 +72,7 @@ impl Error {
match *self {
Error::Rlp(_) => Punishment::Disable,
Error::Network(_) => Punishment::None,
Error::BufferEmpty => Punishment::Disable,
Error::NoCredits => Punishment::Disable,
Error::UnrecognizedPacket(_) => Punishment::Disconnect,
Error::UnexpectedHandshake => Punishment::Disconnect,
Error::WrongNetwork => Punishment::Disable,
@ -103,7 +103,7 @@ impl fmt::Display for Error {
match *self {
Error::Rlp(ref err) => err.fmt(f),
Error::Network(ref err) => err.fmt(f),
Error::BufferEmpty => write!(f, "Out of buffer"),
Error::NoCredits => write!(f, "Out of request credits"),
Error::UnrecognizedPacket(code) => write!(f, "Unrecognized packet: 0x{:x}", code),
Error::UnexpectedHandshake => write!(f, "Unexpected handshake"),
Error::WrongNetwork => write!(f, "Wrong network"),

View File

@ -37,7 +37,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use provider::Provider;
use request::{self, HashOrNumber, Request};
use self::buffer_flow::{Buffer, FlowParams};
use self::request_credits::{Credits, FlowParams};
use self::context::{Ctx, TickCtx};
use self::error::Punishment;
use self::request_set::RequestSet;
@ -51,7 +51,7 @@ mod request_set;
#[cfg(test)]
mod tests;
pub mod buffer_flow;
pub mod request_credits;
pub use self::error::Error;
pub use self::context::{BasicContext, EventContext, IoContext};
@ -143,10 +143,10 @@ struct PendingPeer {
/// Relevant data to each peer. Not accessible publicly, only `pub` due to
/// limitations of the privacy system.
pub struct Peer {
local_buffer: Buffer, // their buffer relative to us
local_credits: Credits, // their credits relative to us
status: Status,
capabilities: Capabilities,
remote_flow: Option<(Buffer, FlowParams)>,
remote_flow: Option<(Credits, FlowParams)>,
sent_head: H256, // last chain head we've given them.
last_update: SteadyTime,
pending_requests: RequestSet,
@ -155,21 +155,21 @@ pub struct Peer {
impl Peer {
// check the maximum cost of a request, returning an error if there's
// not enough buffer left.
// not enough credits left.
// returns the calculated maximum cost.
fn deduct_max(&mut self, flow_params: &FlowParams, kind: request::Kind, max: usize) -> Result<U256, Error> {
flow_params.recharge(&mut self.local_buffer);
flow_params.recharge(&mut self.local_credits);
let max_cost = flow_params.compute_cost(kind, max);
self.local_buffer.deduct_cost(max_cost)?;
self.local_credits.deduct_cost(max_cost)?;
Ok(max_cost)
}
// refund buffer for a request. returns new buffer amount.
// refund credits for a request. returns new amount of credits.
fn refund(&mut self, flow_params: &FlowParams, amount: U256) -> U256 {
flow_params.refund(&mut self.local_buffer, amount);
flow_params.refund(&mut self.local_credits, amount);
self.local_buffer.current()
self.local_credits.current()
}
}
@ -218,7 +218,7 @@ pub trait Handler: Send + Sync {
pub struct Params {
/// Network id.
pub network_id: u64,
/// Buffer flow parameters.
/// Request credits parameters.
pub flow_params: FlowParams,
/// Initial capabilities.
pub capabilities: Capabilities,
@ -324,14 +324,14 @@ impl LightProtocol {
/// Check the maximum amount of requests of a specific type
/// which a peer would be able to serve. Returns zero if the
/// peer is unknown or has no buffer flow parameters.
/// peer is unknown or has no credit parameters.
fn max_requests(&self, peer: PeerId, kind: request::Kind) -> usize {
self.peers.read().get(&peer).and_then(|peer| {
let mut peer = peer.lock();
match peer.remote_flow {
Some((ref mut buf, ref flow)) => {
flow.recharge(buf);
Some(flow.max_amount(&*buf, kind))
Some((ref mut c, ref flow)) => {
flow.recharge(c);
Some(flow.max_amount(&*c, kind))
}
None => None,
}
@ -341,7 +341,7 @@ impl LightProtocol {
/// Make a request to a peer.
///
/// Fails on: nonexistent peer, network error, peer not server,
/// insufficient buffer. Does not check capabilities before sending.
/// insufficient credits. Does not check capabilities before sending.
/// On success, returns a request id which can later be coordinated
/// with an event.
pub fn request_from(&self, io: &IoContext, peer_id: &PeerId, request: Request) -> Result<ReqId, Error> {
@ -350,10 +350,10 @@ impl LightProtocol {
let mut peer = peer.lock();
match peer.remote_flow {
Some((ref mut buf, ref flow)) => {
flow.recharge(buf);
Some((ref mut c, ref flow)) => {
flow.recharge(c);
let max = flow.compute_cost(request.kind(), request.amount());
buf.deduct_cost(max)?;
c.deduct_cost(max)?;
}
None => return Err(Error::NotServer),
}
@ -454,7 +454,7 @@ impl LightProtocol {
// - check whether request kinds match
fn pre_verify_response(&self, peer: &PeerId, kind: request::Kind, raw: &UntrustedRlp) -> Result<IdGuard, Error> {
let req_id = ReqId(raw.val_at(0)?);
let cur_buffer: U256 = raw.val_at(1)?;
let cur_credits: U256 = raw.val_at(1)?;
trace!(target: "les", "pre-verifying response from peer {}, kind={:?}", peer, kind);
@ -470,9 +470,9 @@ impl LightProtocol {
(Some(request), Some(flow_info)) => {
had_req = true;
let &mut (ref mut buf, ref mut flow) = flow_info;
let actual_buffer = ::std::cmp::min(cur_buffer, *flow.limit());
buf.update_to(actual_buffer);
let &mut (ref mut c, ref mut flow) = flow_info;
let actual_credits = ::std::cmp::min(cur_credits, *flow.limit());
c.update_to(actual_credits);
if request.kind() != kind {
Some(Error::UnsolicitedResponse)
@ -675,10 +675,10 @@ impl LightProtocol {
return Err(Error::BadProtocolVersion);
}
let remote_flow = flow_params.map(|params| (params.create_buffer(), params));
let remote_flow = flow_params.map(|params| (params.create_credits(), params));
self.peers.write().insert(*peer, Mutex::new(Peer {
local_buffer: self.flow_params.create_buffer(),
local_credits: self.flow_params.create_credits(),
status: status.clone(),
capabilities: capabilities.clone(),
remote_flow: remote_flow,
@ -783,10 +783,10 @@ impl LightProtocol {
let actual_cost = self.flow_params.compute_cost(request::Kind::Headers, response.len());
assert!(max_cost >= actual_cost, "Actual cost exceeded maximum computed cost.");
let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost);
let cur_credits = peer.refund(&self.flow_params, max_cost - actual_cost);
io.respond(packet::BLOCK_HEADERS, {
let mut stream = RlpStream::new_list(3);
stream.append(&req_id).append(&cur_buffer).begin_list(response.len());
stream.append(&req_id).append(&cur_credits).begin_list(response.len());
for header in response {
stream.append_raw(&header.into_inner(), 1);
@ -845,11 +845,11 @@ impl LightProtocol {
let actual_cost = self.flow_params.compute_cost(request::Kind::Bodies, response_len);
assert!(max_cost >= actual_cost, "Actual cost exceeded maximum computed cost.");
let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost);
let cur_credits = peer.refund(&self.flow_params, max_cost - actual_cost);
io.respond(packet::BLOCK_BODIES, {
let mut stream = RlpStream::new_list(3);
stream.append(&req_id).append(&cur_buffer).begin_list(response.len());
stream.append(&req_id).append(&cur_credits).begin_list(response.len());
for body in response {
match body {
@ -911,11 +911,11 @@ impl LightProtocol {
let actual_cost = self.flow_params.compute_cost(request::Kind::Receipts, response_len);
assert!(max_cost >= actual_cost, "Actual cost exceeded maximum computed cost.");
let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost);
let cur_credits = peer.refund(&self.flow_params, max_cost - actual_cost);
io.respond(packet::RECEIPTS, {
let mut stream = RlpStream::new_list(3);
stream.append(&req_id).append(&cur_buffer).begin_list(response.len());
stream.append(&req_id).append(&cur_credits).begin_list(response.len());
for receipts in response {
stream.append_raw(&receipts, 1);
@ -985,11 +985,11 @@ impl LightProtocol {
let actual_cost = self.flow_params.compute_cost(request::Kind::StateProofs, response_len);
assert!(max_cost >= actual_cost, "Actual cost exceeded maximum computed cost.");
let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost);
let cur_credits = peer.refund(&self.flow_params, max_cost - actual_cost);
io.respond(packet::PROOFS, {
let mut stream = RlpStream::new_list(3);
stream.append(&req_id).append(&cur_buffer).begin_list(response.len());
stream.append(&req_id).append(&cur_credits).begin_list(response.len());
for proof in response {
stream.append_raw(&proof, 1);
@ -1057,11 +1057,11 @@ impl LightProtocol {
let actual_cost = self.flow_params.compute_cost(request::Kind::Codes, response_len);
assert!(max_cost >= actual_cost, "Actual cost exceeded maximum computed cost.");
let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost);
let cur_credits = peer.refund(&self.flow_params, max_cost - actual_cost);
io.respond(packet::CONTRACT_CODES, {
let mut stream = RlpStream::new_list(3);
stream.append(&req_id).append(&cur_buffer).begin_list(response.len());
stream.append(&req_id).append(&cur_credits).begin_list(response.len());
for code in response {
stream.append(&code);
@ -1130,11 +1130,11 @@ impl LightProtocol {
let actual_cost = self.flow_params.compute_cost(request::Kind::HeaderProofs, response_len);
assert!(max_cost >= actual_cost, "Actual cost exceeded maximum computed cost.");
let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost);
let cur_credits = peer.refund(&self.flow_params, max_cost - actual_cost);
io.respond(packet::HEADER_PROOFS, {
let mut stream = RlpStream::new_list(3);
stream.append(&req_id).append(&cur_buffer).begin_list(response.len());
stream.append(&req_id).append(&cur_credits).begin_list(response.len());
for proof in response {
stream.append_raw(&proof, 1);

View File

@ -14,14 +14,14 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! LES buffer flow management.
//! Request credit management.
//!
//! Every request in the LES protocol leads to a reduction
//! of the requester's buffer value as a rate-limiting mechanism.
//! This buffer value will recharge at a set rate.
//! Every request in the light protocol leads to a reduction
//! of the requester's amount of credits as a rate-limiting mechanism.
//! The amount of credits will recharge at a set rate.
//!
//! This module provides an interface for configuration of buffer
//! flow costs and recharge rates.
//! This module provides an interface for configuration of
//! costs and recharge rates of request credits.
//!
//! Current default costs are picked completely arbitrarily, not based
//! on any empirical timings or mathematical models.
@ -38,19 +38,19 @@ use time::{Duration, SteadyTime};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Cost(pub U256, pub U256);
/// Buffer value.
/// Credits value.
///
/// Produced and recharged using `FlowParams`.
/// Definitive updates can be made as well -- these will reset the recharge
/// point to the time of the update.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Buffer {
pub struct Credits {
estimate: U256,
recharge_point: SteadyTime,
}
impl Buffer {
/// Get the current buffer value.
impl Credits {
/// Get the current amount of credits..
pub fn current(&self) -> U256 { self.estimate.clone() }
/// Make a definitive update.
@ -61,7 +61,7 @@ impl Buffer {
self.recharge_point = SteadyTime::now();
}
/// Attempt to apply the given cost to the buffer.
/// Attempt to apply the given cost to the amount of credits.
///
/// If successful, the cost will be deducted successfully.
///
@ -69,7 +69,7 @@ impl Buffer {
/// error will be produced.
pub fn deduct_cost(&mut self, cost: U256) -> Result<(), Error> {
match cost > self.estimate {
true => Err(Error::BufferEmpty),
true => Err(Error::NoCredits),
false => {
self.estimate = self.estimate - cost;
Ok(())
@ -165,7 +165,7 @@ impl RlpDecodable for CostTable {
}
}
/// A buffer-flow manager handles costs, recharge, limits
/// Handles costs, recharge, limits of request credits.
#[derive(Debug, Clone, PartialEq)]
pub struct FlowParams {
costs: CostTable,
@ -175,7 +175,7 @@ pub struct FlowParams {
impl FlowParams {
/// Create new flow parameters from a request cost table,
/// buffer limit, and (minimum) rate of recharge.
/// credit limit, and (minimum) rate of recharge.
pub fn new(limit: U256, costs: CostTable, recharge: U256) -> Self {
FlowParams {
costs: costs,
@ -201,7 +201,7 @@ impl FlowParams {
}
}
/// Get a reference to the buffer limit.
/// Get a reference to the credit limit.
pub fn limit(&self) -> &U256 { &self.limit }
/// Get a reference to the cost table.
@ -227,10 +227,10 @@ impl FlowParams {
}
/// Compute the maximum number of costs of a specific kind which can be made
/// with the given buffer.
/// with the given amount of credits
/// Saturates at `usize::max()`. This is not a problem in practice because
/// this amount of requests is already prohibitively large.
pub fn max_amount(&self, buffer: &Buffer, kind: request::Kind) -> usize {
pub fn max_amount(&self, credits: &Credits, kind: request::Kind) -> usize {
use util::Uint;
use std::usize;
@ -243,7 +243,7 @@ impl FlowParams {
request::Kind::HeaderProofs => &self.costs.header_proofs,
};
let start = buffer.current();
let start = credits.current();
if start <= cost.0 {
return 0;
@ -259,36 +259,36 @@ impl FlowParams {
}
}
/// Create initial buffer parameter.
pub fn create_buffer(&self) -> Buffer {
Buffer {
/// Create initial credits..
pub fn create_credits(&self) -> Credits {
Credits {
estimate: self.limit,
recharge_point: SteadyTime::now(),
}
}
/// Recharge the buffer based on time passed since last
/// Recharge the given credits based on time passed since last
/// update.
pub fn recharge(&self, buf: &mut Buffer) {
pub fn recharge(&self, credits: &mut Credits) {
let now = SteadyTime::now();
// recompute and update only in terms of full seconds elapsed
// in order to keep the estimate as an underestimate.
let elapsed = (now - buf.recharge_point).num_seconds();
buf.recharge_point = buf.recharge_point + Duration::seconds(elapsed);
let elapsed = (now - credits.recharge_point).num_seconds();
credits.recharge_point = credits.recharge_point + Duration::seconds(elapsed);
let elapsed: U256 = elapsed.into();
buf.estimate = ::std::cmp::min(self.limit, buf.estimate + (elapsed * self.recharge));
credits.estimate = ::std::cmp::min(self.limit, credits.estimate + (elapsed * self.recharge));
}
/// Refund some buffer which was previously deducted.
/// Refund some credits which were previously deducted.
/// Does not update the recharge timestamp.
pub fn refund(&self, buf: &mut Buffer, refund_amount: U256) {
buf.estimate = buf.estimate + refund_amount;
pub fn refund(&self, credits: &mut Credits, refund_amount: U256) {
credits.estimate = credits.estimate + refund_amount;
if buf.estimate > self.limit {
buf.estimate = self.limit
if credits.estimate > self.limit {
credits.estimate = self.limit
}
}
}
@ -318,20 +318,20 @@ mod tests {
}
#[test]
fn buffer_mechanism() {
fn credits_mechanism() {
use std::thread;
use std::time::Duration;
let flow_params = FlowParams::new(100.into(), Default::default(), 20.into());
let mut buffer = flow_params.create_buffer();
let mut credits = flow_params.create_credits();
assert!(buffer.deduct_cost(101.into()).is_err());
assert!(buffer.deduct_cost(10.into()).is_ok());
assert!(credits.deduct_cost(101.into()).is_err());
assert!(credits.deduct_cost(10.into()).is_ok());
thread::sleep(Duration::from_secs(1));
flow_params.recharge(&mut buffer);
flow_params.recharge(&mut credits);
assert_eq!(buffer.estimate, 100.into());
assert_eq!(credits.estimate, 100.into());
}
}

View File

@ -19,7 +19,7 @@
use rlp::{DecoderError, RlpDecodable, RlpEncodable, RlpStream, Stream, UntrustedRlp, View};
use util::{H256, U256};
use super::buffer_flow::FlowParams;
use super::request_credits::FlowParams;
// recognized handshake/announcement keys.
// unknown keys are to be skipped, known keys have a defined order.
@ -207,7 +207,7 @@ impl Capabilities {
/// Attempt to parse a handshake message into its three parts:
/// - chain status
/// - serving capabilities
/// - buffer flow parameters
/// - request credit parameters
pub fn parse_handshake(rlp: UntrustedRlp) -> Result<(Status, Capabilities, Option<FlowParams>), DecoderError> {
let mut parser = Parser {
pos: 0,
@ -300,7 +300,7 @@ pub struct Announcement {
pub serve_chain_since: Option<u64>,
/// optional new transaction-relay capability. false means "no change"
pub tx_relay: bool,
// TODO: changes in buffer flow?
// TODO: changes in request credits.
}
/// Parse an announcement.
@ -372,7 +372,7 @@ pub fn write_announcement(announcement: &Announcement) -> Vec<u8> {
#[cfg(test)]
mod tests {
use super::*;
use super::super::buffer_flow::FlowParams;
use super::super::request_credits::FlowParams;
use util::{U256, H256, FixedHash};
use rlp::{RlpStream, Stream ,UntrustedRlp, View};

View File

@ -24,7 +24,7 @@ use ethcore::transaction::PendingTransaction;
use ethcore::encoded;
use network::{PeerId, NodeId};
use net::buffer_flow::FlowParams;
use net::request_credits::FlowParams;
use net::context::IoContext;
use net::status::{Capabilities, Status, write_handshake};
use net::{encode_request, LightProtocol, Params, packet, Peer};
@ -203,7 +203,7 @@ fn genesis_mismatch() {
}
#[test]
fn buffer_overflow() {
fn credit_overflow() {
let flow_params = make_flow_params();
let capabilities = capabilities();
@ -268,11 +268,11 @@ fn get_block_headers() {
let headers: Vec<_> = (0..10).map(|i| provider.client.block_header(BlockId::Number(i + 1)).unwrap()).collect();
assert_eq!(headers.len(), 10);
let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::Headers, 10);
let new_creds = *flow_params.limit() - flow_params.compute_cost(request::Kind::Headers, 10);
let mut response_stream = RlpStream::new_list(3);
response_stream.append(&req_id).append(&new_buf).begin_list(10);
response_stream.append(&req_id).append(&new_creds).begin_list(10);
for header in headers {
response_stream.append_raw(&header.into_inner(), 1);
}
@ -317,11 +317,11 @@ fn get_block_bodies() {
let bodies: Vec<_> = (0..10).map(|i| provider.client.block_body(BlockId::Number(i + 1)).unwrap()).collect();
assert_eq!(bodies.len(), 10);
let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::Bodies, 10);
let new_creds = *flow_params.limit() - flow_params.compute_cost(request::Kind::Bodies, 10);
let mut response_stream = RlpStream::new_list(3);
response_stream.append(&req_id).append(&new_buf).begin_list(10);
response_stream.append(&req_id).append(&new_creds).begin_list(10);
for body in bodies {
response_stream.append_raw(&body.into_inner(), 1);
}
@ -371,11 +371,11 @@ fn get_block_receipts() {
.map(|hash| provider.client.block_receipts(hash).unwrap())
.collect();
let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::Receipts, receipts.len());
let new_creds = *flow_params.limit() - flow_params.compute_cost(request::Kind::Receipts, receipts.len());
let mut response_stream = RlpStream::new_list(3);
response_stream.append(&req_id).append(&new_buf).begin_list(receipts.len());
response_stream.append(&req_id).append(&new_creds).begin_list(receipts.len());
for block_receipts in receipts {
response_stream.append_raw(&block_receipts, 1);
}
@ -420,11 +420,11 @@ fn get_state_proofs() {
vec![::util::sha3::SHA3_NULL_RLP.to_vec()],
];
let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::StateProofs, 2);
let new_creds = *flow_params.limit() - flow_params.compute_cost(request::Kind::StateProofs, 2);
let mut response_stream = RlpStream::new_list(3);
response_stream.append(&req_id).append(&new_buf).begin_list(2);
response_stream.append(&req_id).append(&new_creds).begin_list(2);
for proof in proofs {
response_stream.begin_list(proof.len());
for node in proof {
@ -472,11 +472,11 @@ fn get_contract_code() {
key2.iter().chain(key2.iter()).cloned().collect(),
];
let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::Codes, 2);
let new_creds = *flow_params.limit() - flow_params.compute_cost(request::Kind::Codes, 2);
let mut response_stream = RlpStream::new_list(3);
response_stream.append(&req_id).append(&new_buf).begin_list(2);
response_stream.append(&req_id).append(&new_creds).begin_list(2);
for code in codes {
response_stream.append(&code);
}
@ -515,10 +515,10 @@ fn id_guard() {
pending_requests.insert(req_id_2, req, ::time::SteadyTime::now());
proto.peers.write().insert(peer_id, ::util::Mutex::new(Peer {
local_buffer: flow_params.create_buffer(),
local_credits: flow_params.create_credits(),
status: status(provider.client.chain_info()),
capabilities: capabilities.clone(),
remote_flow: Some((flow_params.create_buffer(), flow_params)),
remote_flow: Some((flow_params.create_credits(), flow_params)),
sent_head: provider.client.chain_info().best_block_hash,
last_update: ::time::SteadyTime::now(),
pending_requests: pending_requests,

View File

@ -27,7 +27,7 @@ use ethcore::spec::Spec;
use io::IoChannel;
use light::client::Client as LightClient;
use light::net::{LightProtocol, IoContext, Capabilities, Params as LightParams};
use light::net::buffer_flow::FlowParams;
use light::net::request_credits::FlowParams;
use network::{NodeId, PeerId};
use util::RwLock;