Adjust requests costs for light client (#9925)

* PIP Table Cost relative to average peers instead of max peers

* Add tracing in PIP new_cost_table

* Update stat peer_count

* Use number of leeching peers for Light serve costs

* Fix test::light_params_load_share_depends_on_max_peers (wrong type)

* Remove (now) useless test

* Remove `load_share` from LightParams.Config
Prevent div. by 0

* Add LEECHER_COUNT_FACTOR

* PR Grumble: u64 to u32 for f64 casting

* Prevent u32 overflow for avg_peer_count

* Add tests for LightSync::Statistics
This commit is contained in:
Nicolas Gotchac 2018-11-21 20:11:01 +01:00 committed by Afri Schoedon
parent 8865b95818
commit 664bb2becd
3 changed files with 117 additions and 32 deletions

View File

@ -28,7 +28,7 @@ use parking_lot::{Mutex, RwLock};
use provider::Provider; use provider::Provider;
use request::{Request, NetworkRequests as Requests, Response}; use request::{Request, NetworkRequests as Requests, Response};
use rlp::{RlpStream, Rlp}; use rlp::{RlpStream, Rlp};
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet, VecDeque};
use std::fmt; use std::fmt;
use std::ops::{BitOr, BitAnd, Not}; use std::ops::{BitOr, BitAnd, Not};
use std::sync::Arc; use std::sync::Arc;
@ -38,7 +38,7 @@ use std::time::{Duration, Instant};
use self::request_credits::{Credits, FlowParams}; use self::request_credits::{Credits, FlowParams};
use self::context::{Ctx, TickCtx}; use self::context::{Ctx, TickCtx};
use self::error::Punishment; use self::error::Punishment;
use self::load_timer::{LoadDistribution, NullStore}; use self::load_timer::{LoadDistribution, NullStore, MOVING_SAMPLE_SIZE};
use self::request_set::RequestSet; use self::request_set::RequestSet;
use self::id_guard::IdGuard; use self::id_guard::IdGuard;
@ -70,6 +70,16 @@ const PROPAGATE_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5);
const RECALCULATE_COSTS_TIMEOUT: TimerToken = 3; const RECALCULATE_COSTS_TIMEOUT: TimerToken = 3;
const RECALCULATE_COSTS_INTERVAL: Duration = Duration::from_secs(60 * 60); const RECALCULATE_COSTS_INTERVAL: Duration = Duration::from_secs(60 * 60);
const STATISTICS_TIMEOUT: TimerToken = 4;
const STATISTICS_INTERVAL: Duration = Duration::from_secs(15);
/// Maximum load share for the light server
pub const MAX_LIGHTSERV_LOAD: f64 = 0.5;
/// Factor to multiply leecher count to cater for
/// extra sudden connections (should be >= 1.0)
pub const LEECHER_COUNT_FACTOR: f64 = 1.25;
// minimum interval between updates. // minimum interval between updates.
const UPDATE_INTERVAL: Duration = Duration::from_millis(5000); const UPDATE_INTERVAL: Duration = Duration::from_millis(5000);
@ -256,18 +266,18 @@ pub trait Handler: Send + Sync {
pub struct Config { pub struct Config {
/// How many stored seconds of credits peers should be able to accumulate. /// How many stored seconds of credits peers should be able to accumulate.
pub max_stored_seconds: u64, pub max_stored_seconds: u64,
/// How much of the total load capacity each peer should be allowed to take. /// The network config median peers (used as default peer count)
pub load_share: f64, pub median_peers: f64,
} }
impl Default for Config { impl Default for Config {
fn default() -> Self { fn default() -> Self {
const LOAD_SHARE: f64 = 1.0 / 25.0; const MEDIAN_PEERS: f64 = 25.0;
const MAX_ACCUMULATED: u64 = 60 * 5; // only charge for 5 minutes. const MAX_ACCUMULATED: u64 = 60 * 5; // only charge for 5 minutes.
Config { Config {
max_stored_seconds: MAX_ACCUMULATED, max_stored_seconds: MAX_ACCUMULATED,
load_share: LOAD_SHARE, median_peers: MEDIAN_PEERS,
} }
} }
} }
@ -335,6 +345,42 @@ mod id_guard {
} }
} }
/// Provides various statistics that could
/// be used to compute costs
pub struct Statistics {
/// Samples of peer count
peer_counts: VecDeque<usize>,
}
impl Statistics {
/// Create a new Statistics instance
pub fn new() -> Self {
Statistics {
peer_counts: VecDeque::with_capacity(MOVING_SAMPLE_SIZE),
}
}
/// Add a new peer_count sample
pub fn add_peer_count(&mut self, peer_count: usize) {
while self.peer_counts.len() >= MOVING_SAMPLE_SIZE {
self.peer_counts.pop_front();
}
self.peer_counts.push_back(peer_count);
}
/// Get the average peer count from previous samples. Is always >= 1.0
pub fn avg_peer_count(&self) -> f64 {
let len = self.peer_counts.len();
if len == 0 {
return 1.0;
}
let avg = self.peer_counts.iter()
.fold(0, |sum: u32, &v| sum.saturating_add(v as u32)) as f64
/ len as f64;
avg.max(1.0)
}
}
/// This is an implementation of the light ethereum network protocol, abstracted /// This is an implementation of the light ethereum network protocol, abstracted
/// over a `Provider` of data and a p2p network. /// over a `Provider` of data and a p2p network.
/// ///
@ -359,6 +405,7 @@ pub struct LightProtocol {
req_id: AtomicUsize, req_id: AtomicUsize,
sample_store: Box<SampleStore>, sample_store: Box<SampleStore>,
load_distribution: LoadDistribution, load_distribution: LoadDistribution,
statistics: RwLock<Statistics>,
} }
impl LightProtocol { impl LightProtocol {
@ -369,9 +416,11 @@ impl LightProtocol {
let genesis_hash = provider.chain_info().genesis_hash; let genesis_hash = provider.chain_info().genesis_hash;
let sample_store = params.sample_store.unwrap_or_else(|| Box::new(NullStore)); let sample_store = params.sample_store.unwrap_or_else(|| Box::new(NullStore));
let load_distribution = LoadDistribution::load(&*sample_store); let load_distribution = LoadDistribution::load(&*sample_store);
// Default load share relative to median peers
let load_share = MAX_LIGHTSERV_LOAD / params.config.median_peers;
let flow_params = FlowParams::from_request_times( let flow_params = FlowParams::from_request_times(
|kind| load_distribution.expected_time(kind), |kind| load_distribution.expected_time(kind),
params.config.load_share, load_share,
Duration::from_secs(params.config.max_stored_seconds), Duration::from_secs(params.config.max_stored_seconds),
); );
@ -389,6 +438,7 @@ impl LightProtocol {
req_id: AtomicUsize::new(0), req_id: AtomicUsize::new(0),
sample_store, sample_store,
load_distribution, load_distribution,
statistics: RwLock::new(Statistics::new()),
} }
} }
@ -408,6 +458,16 @@ impl LightProtocol {
) )
} }
/// Get the number of active light peers downloading from the
/// node
pub fn leecher_count(&self) -> usize {
let credit_limit = *self.flow_params.read().limit();
// Count the number of peers that used some credit
self.peers.read().iter()
.filter(|(_, p)| p.lock().local_credits.current() < credit_limit)
.count()
}
/// Make a request to a peer. /// Make a request to a peer.
/// ///
/// Fails on: nonexistent peer, network error, peer not server, /// Fails on: nonexistent peer, network error, peer not server,
@ -772,12 +832,16 @@ impl LightProtocol {
fn begin_new_cost_period(&self, io: &IoContext) { fn begin_new_cost_period(&self, io: &IoContext) {
self.load_distribution.end_period(&*self.sample_store); self.load_distribution.end_period(&*self.sample_store);
let avg_peer_count = self.statistics.read().avg_peer_count();
// Load share relative to average peer count +LEECHER_COUNT_FACTOR%
let load_share = MAX_LIGHTSERV_LOAD / (avg_peer_count * LEECHER_COUNT_FACTOR);
let new_params = Arc::new(FlowParams::from_request_times( let new_params = Arc::new(FlowParams::from_request_times(
|kind| self.load_distribution.expected_time(kind), |kind| self.load_distribution.expected_time(kind),
self.config.load_share, load_share,
Duration::from_secs(self.config.max_stored_seconds), Duration::from_secs(self.config.max_stored_seconds),
)); ));
*self.flow_params.write() = new_params.clone(); *self.flow_params.write() = new_params.clone();
trace!(target: "pip", "New cost period: avg_peers={} ; cost_table:{:?}", avg_peer_count, new_params.cost_table());
let peers = self.peers.read(); let peers = self.peers.read();
let now = Instant::now(); let now = Instant::now();
@ -797,6 +861,11 @@ impl LightProtocol {
peer_info.awaiting_acknowledge = Some((now, new_params.clone())); peer_info.awaiting_acknowledge = Some((now, new_params.clone()));
} }
} }
fn tick_statistics(&self) {
let leecher_count = self.leecher_count();
self.statistics.write().add_peer_count(leecher_count);
}
} }
impl LightProtocol { impl LightProtocol {
@ -1099,6 +1168,8 @@ impl NetworkProtocolHandler for LightProtocol {
.expect("Error registering sync timer."); .expect("Error registering sync timer.");
io.register_timer(RECALCULATE_COSTS_TIMEOUT, RECALCULATE_COSTS_INTERVAL) io.register_timer(RECALCULATE_COSTS_TIMEOUT, RECALCULATE_COSTS_INTERVAL)
.expect("Error registering request timer interval token."); .expect("Error registering request timer interval token.");
io.register_timer(STATISTICS_TIMEOUT, STATISTICS_INTERVAL)
.expect("Error registering statistics timer.");
} }
fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
@ -1119,6 +1190,7 @@ impl NetworkProtocolHandler for LightProtocol {
TICK_TIMEOUT => self.tick_handlers(&io), TICK_TIMEOUT => self.tick_handlers(&io),
PROPAGATE_TIMEOUT => self.propagate_transactions(&io), PROPAGATE_TIMEOUT => self.propagate_transactions(&io),
RECALCULATE_COSTS_TIMEOUT => self.begin_new_cost_period(&io), RECALCULATE_COSTS_TIMEOUT => self.begin_new_cost_period(&io),
STATISTICS_TIMEOUT => self.tick_statistics(),
_ => warn!(target: "pip", "received timeout on unknown token {}", timer), _ => warn!(target: "pip", "received timeout on unknown token {}", timer),
} }
} }

View File

@ -22,9 +22,10 @@ use ethcore::client::{EachBlockWith, TestBlockChainClient};
use ethcore::encoded; use ethcore::encoded;
use ethcore::ids::BlockId; use ethcore::ids::BlockId;
use ethereum_types::{H256, U256, Address}; use ethereum_types::{H256, U256, Address};
use net::{LightProtocol, Params, packet, Peer}; use net::{LightProtocol, Params, packet, Peer, Statistics};
use net::context::IoContext; use net::context::IoContext;
use net::status::{Capabilities, Status}; use net::status::{Capabilities, Status};
use net::load_timer::MOVING_SAMPLE_SIZE;
use network::{PeerId, NodeId}; use network::{PeerId, NodeId};
use provider::Provider; use provider::Provider;
use request; use request;
@ -780,3 +781,34 @@ fn get_transaction_index() {
let expected = Expect::Respond(packet::RESPONSE, response); let expected = Expect::Respond(packet::RESPONSE, response);
proto.handle_packet(&expected, 1, packet::REQUEST, &request_body); proto.handle_packet(&expected, 1, packet::REQUEST, &request_body);
} }
#[test]
fn sync_statistics() {
let mut stats = Statistics::new();
// Empty set should return 1.0
assert_eq!(stats.avg_peer_count(), 1.0);
// Average < 1.0 should return 1.0
stats.add_peer_count(0);
assert_eq!(stats.avg_peer_count(), 1.0);
stats = Statistics::new();
const N: f64 = 50.0;
for i in 1..(N as usize + 1) {
stats.add_peer_count(i);
}
// Compute the average for the sum 1..N
assert_eq!(stats.avg_peer_count(), N * (N + 1.0) / 2.0 / N);
for _ in 1..(MOVING_SAMPLE_SIZE + 1) {
stats.add_peer_count(40);
}
// Test that it returns the average of the last
// `MOVING_SAMPLE_SIZE` values
assert_eq!(stats.avg_peer_count(), 40.0);
}

View File

@ -264,12 +264,10 @@ pub struct EthSync {
fn light_params( fn light_params(
network_id: u64, network_id: u64,
max_peers: u32, median_peers: f64,
pruning_info: PruningInfo, pruning_info: PruningInfo,
sample_store: Option<Box<SampleStore>>, sample_store: Option<Box<SampleStore>>,
) -> LightParams { ) -> LightParams {
const MAX_LIGHTSERV_LOAD: f64 = 0.5;
let mut light_params = LightParams { let mut light_params = LightParams {
network_id: network_id, network_id: network_id,
config: Default::default(), config: Default::default(),
@ -282,9 +280,7 @@ fn light_params(
sample_store: sample_store, sample_store: sample_store,
}; };
let max_peers = ::std::cmp::max(max_peers, 1); light_params.config.median_peers = median_peers;
light_params.config.load_share = MAX_LIGHTSERV_LOAD / max_peers as f64;
light_params light_params
} }
@ -301,9 +297,10 @@ impl EthSync {
.map(|mut p| { p.push("request_timings"); light_net::FileStore(p) }) .map(|mut p| { p.push("request_timings"); light_net::FileStore(p) })
.map(|store| Box::new(store) as Box<_>); .map(|store| Box::new(store) as Box<_>);
let median_peers = (params.network_config.min_peers + params.network_config.max_peers) as f64 / 2.0;
let light_params = light_params( let light_params = light_params(
params.config.network_id, params.config.network_id,
params.network_config.max_peers, median_peers,
pruning_info, pruning_info,
sample_store, sample_store,
); );
@ -940,19 +937,3 @@ impl LightSyncProvider for LightSync {
Default::default() // TODO Default::default() // TODO
} }
} }
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn light_params_load_share_depends_on_max_peers() {
let pruning_info = PruningInfo {
earliest_chain: 0,
earliest_state: 0,
};
let params1 = light_params(0, 10, pruning_info.clone(), None);
let params2 = light_params(0, 20, pruning_info, None);
assert!(params1.config.load_share > params2.config.load_share)
}
}