From 664bb2becd9539c4532e540b1d73b128fdd49862 Mon Sep 17 00:00:00 2001 From: Nicolas Gotchac Date: Wed, 21 Nov 2018 20:11:01 +0100 Subject: [PATCH] Adjust requests costs for light client (#9925) * PIP Table Cost relative to average peers instead of max peers * Add tracing in PIP new_cost_table * Update stat peer_count * Use number of leeching peers for Light serve costs * Fix test::light_params_load_share_depends_on_max_peers (wrong type) * Remove (now) useless test * Remove `load_share` from LightParams.Config Prevent div. by 0 * Add LEECHER_COUNT_FACTOR * PR Grumble: u64 to u32 for f64 casting * Prevent u32 overflow for avg_peer_count * Add tests for LightSync::Statistics --- ethcore/light/src/net/mod.rs | 88 +++++++++++++++++++++++++++--- ethcore/light/src/net/tests/mod.rs | 34 +++++++++++- ethcore/sync/src/api.rs | 27 ++------- 3 files changed, 117 insertions(+), 32 deletions(-) diff --git a/ethcore/light/src/net/mod.rs b/ethcore/light/src/net/mod.rs index 590be038e..cec2c6994 100644 --- a/ethcore/light/src/net/mod.rs +++ b/ethcore/light/src/net/mod.rs @@ -28,7 +28,7 @@ use parking_lot::{Mutex, RwLock}; use provider::Provider; use request::{Request, NetworkRequests as Requests, Response}; use rlp::{RlpStream, Rlp}; -use std::collections::{HashMap, HashSet}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::fmt; use std::ops::{BitOr, BitAnd, Not}; use std::sync::Arc; @@ -38,7 +38,7 @@ use std::time::{Duration, Instant}; use self::request_credits::{Credits, FlowParams}; use self::context::{Ctx, TickCtx}; use self::error::Punishment; -use self::load_timer::{LoadDistribution, NullStore}; +use self::load_timer::{LoadDistribution, NullStore, MOVING_SAMPLE_SIZE}; use self::request_set::RequestSet; use self::id_guard::IdGuard; @@ -70,6 +70,16 @@ const PROPAGATE_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5); const RECALCULATE_COSTS_TIMEOUT: TimerToken = 3; const RECALCULATE_COSTS_INTERVAL: Duration = Duration::from_secs(60 * 60); +const STATISTICS_TIMEOUT: TimerToken = 4; +const STATISTICS_INTERVAL: Duration = Duration::from_secs(15); + +/// Maximum load share for the light server +pub const MAX_LIGHTSERV_LOAD: f64 = 0.5; + +/// Factor to multiply leecher count to cater for +/// extra sudden connections (should be >= 1.0) +pub const LEECHER_COUNT_FACTOR: f64 = 1.25; + // minimum interval between updates. const UPDATE_INTERVAL: Duration = Duration::from_millis(5000); @@ -256,18 +266,18 @@ pub trait Handler: Send + Sync { pub struct Config { /// How many stored seconds of credits peers should be able to accumulate. pub max_stored_seconds: u64, - /// How much of the total load capacity each peer should be allowed to take. - pub load_share: f64, + /// The network config median peers (used as default peer count) + pub median_peers: f64, } impl Default for Config { fn default() -> Self { - const LOAD_SHARE: f64 = 1.0 / 25.0; + const MEDIAN_PEERS: f64 = 25.0; const MAX_ACCUMULATED: u64 = 60 * 5; // only charge for 5 minutes. Config { max_stored_seconds: MAX_ACCUMULATED, - load_share: LOAD_SHARE, + median_peers: MEDIAN_PEERS, } } } @@ -335,6 +345,42 @@ mod id_guard { } } +/// Provides various statistics that could +/// be used to compute costs +pub struct Statistics { + /// Samples of peer count + peer_counts: VecDeque, +} + +impl Statistics { + /// Create a new Statistics instance + pub fn new() -> Self { + Statistics { + peer_counts: VecDeque::with_capacity(MOVING_SAMPLE_SIZE), + } + } + + /// Add a new peer_count sample + pub fn add_peer_count(&mut self, peer_count: usize) { + while self.peer_counts.len() >= MOVING_SAMPLE_SIZE { + self.peer_counts.pop_front(); + } + self.peer_counts.push_back(peer_count); + } + + /// Get the average peer count from previous samples. Is always >= 1.0 + pub fn avg_peer_count(&self) -> f64 { + let len = self.peer_counts.len(); + if len == 0 { + return 1.0; + } + let avg = self.peer_counts.iter() + .fold(0, |sum: u32, &v| sum.saturating_add(v as u32)) as f64 + / len as f64; + avg.max(1.0) + } +} + /// This is an implementation of the light ethereum network protocol, abstracted /// over a `Provider` of data and a p2p network. /// @@ -359,6 +405,7 @@ pub struct LightProtocol { req_id: AtomicUsize, sample_store: Box, load_distribution: LoadDistribution, + statistics: RwLock, } impl LightProtocol { @@ -369,9 +416,11 @@ impl LightProtocol { let genesis_hash = provider.chain_info().genesis_hash; let sample_store = params.sample_store.unwrap_or_else(|| Box::new(NullStore)); let load_distribution = LoadDistribution::load(&*sample_store); + // Default load share relative to median peers + let load_share = MAX_LIGHTSERV_LOAD / params.config.median_peers; let flow_params = FlowParams::from_request_times( |kind| load_distribution.expected_time(kind), - params.config.load_share, + load_share, Duration::from_secs(params.config.max_stored_seconds), ); @@ -389,6 +438,7 @@ impl LightProtocol { req_id: AtomicUsize::new(0), sample_store, load_distribution, + statistics: RwLock::new(Statistics::new()), } } @@ -408,6 +458,16 @@ impl LightProtocol { ) } + /// Get the number of active light peers downloading from the + /// node + pub fn leecher_count(&self) -> usize { + let credit_limit = *self.flow_params.read().limit(); + // Count the number of peers that used some credit + self.peers.read().iter() + .filter(|(_, p)| p.lock().local_credits.current() < credit_limit) + .count() + } + /// Make a request to a peer. /// /// Fails on: nonexistent peer, network error, peer not server, @@ -772,12 +832,16 @@ impl LightProtocol { fn begin_new_cost_period(&self, io: &IoContext) { self.load_distribution.end_period(&*self.sample_store); + let avg_peer_count = self.statistics.read().avg_peer_count(); + // Load share relative to average peer count +LEECHER_COUNT_FACTOR% + let load_share = MAX_LIGHTSERV_LOAD / (avg_peer_count * LEECHER_COUNT_FACTOR); let new_params = Arc::new(FlowParams::from_request_times( |kind| self.load_distribution.expected_time(kind), - self.config.load_share, + load_share, Duration::from_secs(self.config.max_stored_seconds), )); *self.flow_params.write() = new_params.clone(); + trace!(target: "pip", "New cost period: avg_peers={} ; cost_table:{:?}", avg_peer_count, new_params.cost_table()); let peers = self.peers.read(); let now = Instant::now(); @@ -797,6 +861,11 @@ impl LightProtocol { peer_info.awaiting_acknowledge = Some((now, new_params.clone())); } } + + fn tick_statistics(&self) { + let leecher_count = self.leecher_count(); + self.statistics.write().add_peer_count(leecher_count); + } } impl LightProtocol { @@ -1099,6 +1168,8 @@ impl NetworkProtocolHandler for LightProtocol { .expect("Error registering sync timer."); io.register_timer(RECALCULATE_COSTS_TIMEOUT, RECALCULATE_COSTS_INTERVAL) .expect("Error registering request timer interval token."); + io.register_timer(STATISTICS_TIMEOUT, STATISTICS_INTERVAL) + .expect("Error registering statistics timer."); } fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { @@ -1119,6 +1190,7 @@ impl NetworkProtocolHandler for LightProtocol { TICK_TIMEOUT => self.tick_handlers(&io), PROPAGATE_TIMEOUT => self.propagate_transactions(&io), RECALCULATE_COSTS_TIMEOUT => self.begin_new_cost_period(&io), + STATISTICS_TIMEOUT => self.tick_statistics(), _ => warn!(target: "pip", "received timeout on unknown token {}", timer), } } diff --git a/ethcore/light/src/net/tests/mod.rs b/ethcore/light/src/net/tests/mod.rs index d1939015c..791315f5f 100644 --- a/ethcore/light/src/net/tests/mod.rs +++ b/ethcore/light/src/net/tests/mod.rs @@ -22,9 +22,10 @@ use ethcore::client::{EachBlockWith, TestBlockChainClient}; use ethcore::encoded; use ethcore::ids::BlockId; use ethereum_types::{H256, U256, Address}; -use net::{LightProtocol, Params, packet, Peer}; +use net::{LightProtocol, Params, packet, Peer, Statistics}; use net::context::IoContext; use net::status::{Capabilities, Status}; +use net::load_timer::MOVING_SAMPLE_SIZE; use network::{PeerId, NodeId}; use provider::Provider; use request; @@ -780,3 +781,34 @@ fn get_transaction_index() { let expected = Expect::Respond(packet::RESPONSE, response); proto.handle_packet(&expected, 1, packet::REQUEST, &request_body); } + +#[test] +fn sync_statistics() { + let mut stats = Statistics::new(); + + // Empty set should return 1.0 + assert_eq!(stats.avg_peer_count(), 1.0); + + // Average < 1.0 should return 1.0 + stats.add_peer_count(0); + assert_eq!(stats.avg_peer_count(), 1.0); + + stats = Statistics::new(); + + const N: f64 = 50.0; + + for i in 1..(N as usize + 1) { + stats.add_peer_count(i); + } + + // Compute the average for the sum 1..N + assert_eq!(stats.avg_peer_count(), N * (N + 1.0) / 2.0 / N); + + for _ in 1..(MOVING_SAMPLE_SIZE + 1) { + stats.add_peer_count(40); + } + + // Test that it returns the average of the last + // `MOVING_SAMPLE_SIZE` values + assert_eq!(stats.avg_peer_count(), 40.0); +} diff --git a/ethcore/sync/src/api.rs b/ethcore/sync/src/api.rs index 8063ebecb..4296a4c2d 100644 --- a/ethcore/sync/src/api.rs +++ b/ethcore/sync/src/api.rs @@ -264,12 +264,10 @@ pub struct EthSync { fn light_params( network_id: u64, - max_peers: u32, + median_peers: f64, pruning_info: PruningInfo, sample_store: Option>, ) -> LightParams { - const MAX_LIGHTSERV_LOAD: f64 = 0.5; - let mut light_params = LightParams { network_id: network_id, config: Default::default(), @@ -282,9 +280,7 @@ fn light_params( sample_store: sample_store, }; - let max_peers = ::std::cmp::max(max_peers, 1); - light_params.config.load_share = MAX_LIGHTSERV_LOAD / max_peers as f64; - + light_params.config.median_peers = median_peers; light_params } @@ -301,9 +297,10 @@ impl EthSync { .map(|mut p| { p.push("request_timings"); light_net::FileStore(p) }) .map(|store| Box::new(store) as Box<_>); + let median_peers = (params.network_config.min_peers + params.network_config.max_peers) as f64 / 2.0; let light_params = light_params( params.config.network_id, - params.network_config.max_peers, + median_peers, pruning_info, sample_store, ); @@ -940,19 +937,3 @@ impl LightSyncProvider for LightSync { Default::default() // TODO } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn light_params_load_share_depends_on_max_peers() { - let pruning_info = PruningInfo { - earliest_chain: 0, - earliest_state: 0, - }; - let params1 = light_params(0, 10, pruning_info.clone(), None); - let params2 = light_params(0, 20, pruning_info, None); - assert!(params1.config.load_share > params2.config.load_share) - } -}