diff --git a/Cargo.lock b/Cargo.lock index 5b3aad881..cc02a63e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -79,6 +79,16 @@ dependencies = [ "rustc_version 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "bincode" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "num-traits 0.1.32 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "bit-set" version = "0.2.0" @@ -504,6 +514,7 @@ dependencies = [ name = "ethcore-light" version = "1.7.0" dependencies = [ + "bincode 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "ethcore 1.7.0", "ethcore-devtools 1.7.0", "ethcore-io 1.7.0", @@ -516,6 +527,8 @@ dependencies = [ "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", "rlp 0.2.0", + "serde 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_derive 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", "smallvec 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "stats 0.1.0", "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2282,12 +2295,17 @@ name = "serde" version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "serde" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "serde_codegen_internals" version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "syn 0.11.4 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 0.11.11 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2297,7 +2315,26 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "quote 0.3.10 (registry+https://github.com/rust-lang/crates.io-index)", "serde_codegen_internals 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)", - "syn 0.11.4 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 0.11.11 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "serde_derive" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "quote 0.3.10 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_derive_internals 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 0.11.11 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "serde_derive_internals" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "syn 0.11.11 (registry+https://github.com/rust-lang/crates.io-index)", + "synom 0.11.3 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2405,10 +2442,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "syn" -version = "0.11.4" +version = "0.11.11" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "quote 0.3.10 (registry+https://github.com/rust-lang/crates.io-index)", + "synom 0.11.3 (registry+https://github.com/rust-lang/crates.io-index)", + "unicode-xid 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "synom" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ "unicode-xid 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -2859,6 +2905,7 @@ dependencies = [ "checksum base-x 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "2f59103b47307f76e03bef1633aec7fa9e29bfb5aa6daf5a334f94233c71f6c1" "checksum base32 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1b9605ba46d61df0410d8ac686b0007add8172eba90e8e909c347856fe794d8c" "checksum bigint 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "5d1b3ef6756498df0e2c6bb67c065f4154d0ecd721eb5b3c3f865c8012b9fd74" +"checksum bincode 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e103c8b299b28a9c6990458b7013dc4a8356a9b854c51b9883241f5866fac36e" "checksum bit-set 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e6e1e6fb1c9e3d6fcdec57216a74eaa03e41f52a22f13a16438251d8e88b89da" "checksum bit-set 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d9bf6104718e80d7b26a68fdbacff3481cfc05df670821affc7e9cbc1884400c" "checksum bit-vec 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "5b97c2c8e8bbb4251754f559df8af22fb264853c7d009084a576cdf12565089d" @@ -3025,8 +3072,11 @@ dependencies = [ "checksum semver 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7a3186ec9e65071a2095434b1f5bb24838d4e8e130f584c790f6033c79943537" "checksum semver-parser 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" "checksum serde 0.9.6 (registry+https://github.com/rust-lang/crates.io-index)" = "0ae9a3c8b07c09dbe43022486d55a18c629a0618d2241e49829aaef9b6d862f9" +"checksum serde 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "991ef6be409a3b7a46cb9ee701d86156ce851825c65dbee7f16dbd5c4e7e2d47" "checksum serde_codegen_internals 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c3172bf2940b975c0e4f6ab42a511c0a4407d4f46ccef87a9d3615db5c26fa96" "checksum serde_derive 0.9.6 (registry+https://github.com/rust-lang/crates.io-index)" = "ecc6e0379ca933ece58302d2d3034443f06fbf38fd535857c1dc516195cbc3bf" +"checksum serde_derive 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "9fd81eef9f0b4ec341b11095335b6a4b28ed85581b12dd27585dee1529df35e0" +"checksum serde_derive_internals 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)" = "021c338d22c7e30f957a6ab7e388cb6098499dda9fd4ba1661ee074ca7a180d1" "checksum serde_json 0.9.5 (registry+https://github.com/rust-lang/crates.io-index)" = "cf37ce931677e98b4fa5e6469aaa3ab4b6228309ea33b1b22d3ec055adfc4515" "checksum serde_urlencoded 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a81f15da4b9780e1524697f73b09076b6e42298ef673bead9ca8f848b334ef84" "checksum sha1 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "cc30b1e1e8c40c121ca33b86c23308a090d19974ef001b4bf6e61fd1a0fb095c" @@ -3041,7 +3091,8 @@ dependencies = [ "checksum spmc 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "93bdab61c1a413e591c4d17388ffa859eaff2df27f1e13a5ec8b716700605adf" "checksum stable_deref_trait 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "15132e0e364248108c5e2c02e3ab539be8d6f5d52a01ca9bbf27ed657316f02b" "checksum strsim 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b4d15c810519a91cf877e7e36e63fe068815c678181439f2f29e2562147c3694" -"checksum syn 0.11.4 (registry+https://github.com/rust-lang/crates.io-index)" = "f4f94368aae82bb29656c98443a7026ca931a659e8d19dcdc41d6e273054e820" +"checksum syn 0.11.11 (registry+https://github.com/rust-lang/crates.io-index)" = "d3b891b9015c88c576343b9b3e41c2c11a51c219ef067b264bd9c8aa9b441dad" +"checksum synom 0.11.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a393066ed9010ebaed60b9eafa373d4b1baac186dd7e008555b0f702b51945b6" "checksum syntex 0.58.0 (registry+https://github.com/rust-lang/crates.io-index)" = "35f3cc9d446323ef8fefad933b65cd6de271d29fa14a2e9d036a084770c6d6d5" "checksum syntex_errors 0.58.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3af03823ea45d420dd2c1a44bb074e13ea55f9b99afe960fd58eb4069b7f6cad" "checksum syntex_pos 0.58.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1e502a4a904d9f37cf975dbdbb0b08f2d111322f6792bda6eb095b4112c9a24b" diff --git a/ethcore/light/Cargo.toml b/ethcore/light/Cargo.toml index 78210904e..1f8d48ac0 100644 --- a/ethcore/light/Cargo.toml +++ b/ethcore/light/Cargo.toml @@ -24,6 +24,9 @@ smallvec = "0.3.1" futures = "0.1" rand = "0.3" itertools = "0.5" +bincode = "0.8.0" +serde = "1.0" +serde_derive = "1.0" stats = { path = "../../util/stats" } [features] diff --git a/ethcore/light/src/lib.rs b/ethcore/light/src/lib.rs index 5e970b837..18908a3f2 100644 --- a/ethcore/light/src/lib.rs +++ b/ethcore/light/src/lib.rs @@ -60,20 +60,25 @@ pub use self::provider::Provider; pub use self::transaction_queue::TransactionQueue; pub use types::request as request; +#[macro_use] +extern crate serde_derive; + #[macro_use] extern crate log; -extern crate ethcore; -extern crate ethcore_util as util; -extern crate ethcore_network as network; +extern crate bincode; extern crate ethcore_io as io; -extern crate rlp; -extern crate smallvec; -extern crate time; +extern crate ethcore_network as network; +extern crate ethcore_util as util; +extern crate ethcore; extern crate futures; -extern crate rand; extern crate itertools; +extern crate rand; +extern crate rlp; +extern crate serde; +extern crate smallvec; extern crate stats; +extern crate time; #[cfg(feature = "ipc")] extern crate ethcore_ipc as ipc; diff --git a/ethcore/light/src/net/load_timer.rs b/ethcore/light/src/net/load_timer.rs new file mode 100644 index 000000000..190747d40 --- /dev/null +++ b/ethcore/light/src/net/load_timer.rs @@ -0,0 +1,279 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Request load timer and distribution manager. +//! +//! This uses empirical samples of the length of time taken to respond +//! to requests in order to inform request credit costs. +//! +//! The average request time is determined by an exponential moving average +//! of the mean request times during the last `MOVING_SAMPLE_SIZE` time periods of +//! length `TIME_PERIOD_MS`, with the exception that time periods where no data is +//! gathered are excluded. + +use std::collections::{HashMap, VecDeque}; +use std::fs::File; +use std::path::PathBuf; + +use request::{CompleteRequest, Kind}; + +use bincode; +use time; +use util::{Uint, RwLock, Mutex}; + +/// Number of time periods samples should be kept for. +pub const MOVING_SAMPLE_SIZE: usize = 256; + +/// Stores rolling load timer samples. +// TODO: switch to bigint if possible (FP casts aren't available) +pub trait SampleStore: Send + Sync { + /// Load samples. + fn load(&self) -> HashMap>; + + /// Store all samples. + fn store(&self, samples: &HashMap>); +} + +// get a hardcoded, arbitrarily determined (but intended overestimate) +// of the time in nanoseconds to serve a request of the given kind. +// +// TODO: seed this with empirical data. +fn hardcoded_serve_time(kind: Kind) -> u64 { + match kind { + Kind::Headers => 500_000, + Kind::HeaderProof => 500_000, + Kind::Receipts => 1_000_000, + Kind::Body => 1_000_000, + Kind::Account => 1_500_000, + Kind::Storage => 2_000_000, + Kind::Code => 1_500_000, + Kind::Execution => 250, // per gas. + } +} + +/// A no-op store. +pub struct NullStore; + +impl SampleStore for NullStore { + fn load(&self) -> HashMap> { HashMap::new() } + fn store(&self, _samples: &HashMap>) { } +} + +/// Request load distributions. +pub struct LoadDistribution { + active_period: RwLock>>, + samples: RwLock>>, +} + +impl LoadDistribution { + /// Load rolling samples from the given store. + pub fn load(store: &SampleStore) -> Self { + let mut samples = store.load(); + + for kind_samples in samples.values_mut() { + while kind_samples.len() > MOVING_SAMPLE_SIZE { + kind_samples.pop_front(); + } + } + + LoadDistribution { + active_period: RwLock::new(HashMap::new()), + samples: RwLock::new(samples), + } + } + + /// Begin a timer. + pub fn begin_timer<'a>(&'a self, req: &CompleteRequest) -> LoadTimer<'a> { + let kind = req.kind(); + let n = match *req { + CompleteRequest::Headers(ref req) => req.max, + CompleteRequest::Execution(ref req) => req.gas.low_u64(), + _ => 1, + }; + + LoadTimer { + start: time::precise_time_ns(), + n: n, + dist: self, + kind: kind, + } + } + + /// Calculate EMA of load in nanoseconds for a specific request kind. + /// If there is no data for the given request kind, no EMA will be calculated, + /// but a hardcoded time will be returned. + pub fn expected_time_ns(&self, kind: Kind) -> u64 { + let samples = self.samples.read(); + samples.get(&kind).and_then(|s| { + if s.len() == 0 { return None } + + let alpha: f64 = 1f64 / s.len() as f64; + let start = s.front().expect("length known to be non-zero; qed").clone(); + let ema = s.iter().skip(1).fold(start as f64, |a, &c| { + (alpha * c as f64) + ((1.0 - alpha) * a) + }); + + Some(ema as u64) + }).unwrap_or_else(move || hardcoded_serve_time(kind)) + } + + /// End the current time period. Provide a store to + pub fn end_period(&self, store: &SampleStore) { + let active_period = self.active_period.read(); + let mut samples = self.samples.write(); + + for (&kind, set) in active_period.iter() { + let (elapsed, n) = ::std::mem::replace(&mut *set.lock(), (0, 0)); + if n == 0 { continue } + + let kind_samples = samples.entry(kind) + .or_insert_with(|| VecDeque::with_capacity(MOVING_SAMPLE_SIZE)); + + if kind_samples.len() == MOVING_SAMPLE_SIZE { kind_samples.pop_front(); } + kind_samples.push_back(elapsed / n); + } + + store.store(&*samples); + } + + fn update(&self, kind: Kind, elapsed: u64, n: u64) { + macro_rules! update_counters { + ($counters: expr) => { + $counters.0 = $counters.0.saturating_add(elapsed); + $counters.1 = $counters.1.saturating_add(n); + } + }; + + { + let set = self.active_period.read(); + if let Some(counters) = set.get(&kind) { + let mut counters = counters.lock(); + update_counters!(counters); + return; + } + } + + let mut set = self.active_period.write(); + let counters = set + .entry(kind) + .or_insert_with(|| Mutex::new((0, 0))); + + update_counters!(counters.get_mut()); + } +} + +/// A timer for a single request. +/// On drop, this will update the distribution. +pub struct LoadTimer<'a> { + start: u64, + n: u64, + dist: &'a LoadDistribution, + kind: Kind, +} + +impl<'a> Drop for LoadTimer<'a> { + fn drop(&mut self) { + let elapsed = time::precise_time_ns() - self.start; + self.dist.update(self.kind, elapsed, self.n); + } +} + +/// A store which writes directly to a file. +pub struct FileStore(pub PathBuf); + +impl SampleStore for FileStore { + fn load(&self) -> HashMap> { + File::open(&self.0) + .map_err(|e| Box::new(bincode::ErrorKind::IoError(e))) + .and_then(|mut file| bincode::deserialize_from(&mut file, bincode::Infinite)) + .unwrap_or_else(|_| HashMap::new()) + } + + fn store(&self, samples: &HashMap>) { + let res = File::create(&self.0) + .map_err(|e| Box::new(bincode::ErrorKind::IoError(e))) + .and_then(|mut file| bincode::serialize_into(&mut file, samples, bincode::Infinite)); + + if let Err(e) = res { + warn!(target: "pip", "Error writing light request timing samples to file: {}", e); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use request::Kind; + + #[test] + fn hardcoded_before_data() { + let dist = LoadDistribution::load(&NullStore); + assert_eq!(dist.expected_time_ns(Kind::Headers), hardcoded_serve_time(Kind::Headers)); + + dist.update(Kind::Headers, 100_000, 100); + dist.end_period(&NullStore); + + assert_eq!(dist.expected_time_ns(Kind::Headers), 1000); + } + + #[test] + fn moving_average() { + let dist = LoadDistribution::load(&NullStore); + + let mut sum = 0; + + for (i, x) in (0..10).map(|x| x * 10_000).enumerate() { + dist.update(Kind::Headers, x, 1); + dist.end_period(&NullStore); + + sum += x; + if i == 0 { continue } + + let moving_average = dist.expected_time_ns(Kind::Headers); + + // should be weighted below the maximum entry. + let arith_average = (sum as f64 / (i + 1) as f64) as u64; + assert!(moving_average < x); + + // when there are only 2 entries, they should be equal due to choice of + // ALPHA = 1/N. + // otherwise, the weight should be below the arithmetic mean because the much + // smaller previous values are discounted less. + if i == 1 { + assert_eq!(moving_average, arith_average); + } else { + assert!(moving_average < arith_average) + } + } + } + + #[test] + fn file_store() { + let path = ::devtools::RandomTempPath::new(); + let store = FileStore(path.as_path().clone()); + + let mut samples = store.load(); + assert!(samples.is_empty()); + samples.insert(Kind::Headers, vec![5, 2, 7, 2, 2, 4].into()); + samples.insert(Kind::Execution, vec![1, 1, 100, 250].into()); + + store.store(&samples); + + let dup = store.load(); + + assert_eq!(samples, dup); + } +} diff --git a/ethcore/light/src/net/mod.rs b/ethcore/light/src/net/mod.rs index 798f68fe5..6ab5903df 100644 --- a/ethcore/light/src/net/mod.rs +++ b/ethcore/light/src/net/mod.rs @@ -38,11 +38,13 @@ use request::{Request, NetworkRequests as Requests, Response}; use self::request_credits::{Credits, FlowParams}; use self::context::{Ctx, TickCtx}; use self::error::Punishment; +use self::load_timer::{LoadDistribution, NullStore}; use self::request_set::RequestSet; use self::id_guard::IdGuard; mod context; mod error; +mod load_timer; mod status; mod request_set; @@ -51,8 +53,9 @@ mod tests; pub mod request_credits; -pub use self::error::Error; pub use self::context::{BasicContext, EventContext, IoContext}; +pub use self::error::Error; +pub use self::load_timer::{SampleStore, FileStore}; pub use self::status::{Status, Capabilities, Announcement}; const TIMEOUT: TimerToken = 0; @@ -64,6 +67,9 @@ const TICK_TIMEOUT_INTERVAL_MS: u64 = 5000; const PROPAGATE_TIMEOUT: TimerToken = 2; const PROPAGATE_TIMEOUT_INTERVAL_MS: u64 = 5000; +const RECALCULATE_COSTS_TIMEOUT: TimerToken = 3; +const RECALCULATE_COSTS_INTERVAL_MS: u64 = 60 * 60 * 1000; + // minimum interval between updates. const UPDATE_INTERVAL_MS: i64 = 5000; @@ -88,13 +94,18 @@ mod packet { pub const REQUEST: u8 = 0x02; pub const RESPONSE: u8 = 0x03; + // request credits update and acknowledgement. + pub const UPDATE_CREDITS: u8 = 0x04; + pub const ACKNOWLEDGE_UPDATE: u8 = 0x05; + // relay transactions to peers. - pub const SEND_TRANSACTIONS: u8 = 0x04; + pub const SEND_TRANSACTIONS: u8 = 0x06; } // timeouts for different kinds of requests. all values are in milliseconds. mod timeout { pub const HANDSHAKE: i64 = 2500; + pub const ACKNOWLEDGE_UPDATE: i64 = 5000; pub const BASE: i64 = 1500; // base timeout for packet. // timeouts per request within packet. @@ -141,6 +152,9 @@ pub struct Peer { pending_requests: RequestSet, failed_requests: Vec, propagated_transactions: HashSet, + skip_update: bool, + local_flow: Arc, + awaiting_acknowledge: Option<(SteadyTime, Arc)>, } /// A light protocol event handler. @@ -176,14 +190,36 @@ pub trait Handler: Send + Sync { fn on_abort(&self) { } } -/// Protocol parameters. +/// Configuration. +pub struct Config { + /// How many stored seconds of credits peers should be able to accumulate. + pub max_stored_seconds: u64, + /// How much of the total load capacity each peer should be allowed to take. + pub load_share: f64, +} + +impl Default for Config { + fn default() -> Self { + const LOAD_SHARE: f64 = 1.0 / 25.0; + const MAX_ACCUMULATED: u64 = 60 * 5; // only charge for 5 minutes. + + Config { + max_stored_seconds: MAX_ACCUMULATED, + load_share: LOAD_SHARE, + } + } +} + +/// Protocol initialization parameters. pub struct Params { /// Network id. pub network_id: u64, - /// Request credits parameters. - pub flow_params: FlowParams, + /// Config. + pub config: Config, /// Initial capabilities. pub capabilities: Capabilities, + /// The sample store (`None` if data shouldn't persist between runs). + pub sample_store: Option>, } /// Type alias for convenience. @@ -249,14 +285,17 @@ mod id_guard { // on the peers, only one peer may be held at a time. pub struct LightProtocol { provider: Arc, + config: Config, genesis_hash: H256, network_id: u64, pending_peers: RwLock>, peers: RwLock, capabilities: RwLock, - flow_params: FlowParams, // assumed static and same for every peer. + flow_params: RwLock>, handlers: Vec>, req_id: AtomicUsize, + sample_store: Box, + load_distribution: LoadDistribution, } impl LightProtocol { @@ -265,16 +304,27 @@ impl LightProtocol { debug!(target: "pip", "Initializing light protocol handler"); let genesis_hash = provider.chain_info().genesis_hash; + let sample_store = params.sample_store.unwrap_or_else(|| Box::new(NullStore)); + let load_distribution = LoadDistribution::load(&*sample_store); + let flow_params = FlowParams::from_request_times( + |kind| load_distribution.expected_time_ns(kind), + params.config.load_share, + params.config.max_stored_seconds, + ); + LightProtocol { provider: provider, + config: params.config, genesis_hash: genesis_hash, network_id: params.network_id, pending_peers: RwLock::new(HashMap::new()), peers: RwLock::new(HashMap::new()), capabilities: RwLock::new(params.capabilities), - flow_params: params.flow_params, + flow_params: RwLock::new(Arc::new(flow_params)), handlers: Vec::new(), req_id: AtomicUsize::new(0), + sample_store: sample_store, + load_distribution: load_distribution, } } @@ -422,8 +472,9 @@ impl LightProtocol { let res = match peers.get(peer) { Some(peer_info) => { let mut peer_info = peer_info.lock(); + let peer_info: &mut Peer = &mut *peer_info; let req_info = peer_info.pending_requests.remove(&req_id, SteadyTime::now()); - let cumulative_cost = peer_info.pending_requests.cumulative_cost(); + let last_batched = peer_info.pending_requests.is_empty(); let flow_info = peer_info.remote_flow.as_mut(); match (req_info, flow_info) { @@ -431,11 +482,14 @@ impl LightProtocol { let &mut (ref mut c, ref mut flow) = flow_info; // only update if the cumulative cost of the request set is zero. - if cumulative_cost == 0.into() { + // and this response wasn't from before request costs were updated. + if !peer_info.skip_update && last_batched { let actual_credits = ::std::cmp::min(cur_credits, *flow.limit()); c.update_to(actual_credits); } + if last_batched { peer_info.skip_update = false } + Ok(()) } (None, _) => Err(Error::UnsolicitedResponse), @@ -464,6 +518,9 @@ impl LightProtocol { packet::REQUEST => self.request(peer, io, rlp), packet::RESPONSE => self.response(peer, io, rlp), + packet::UPDATE_CREDITS => self.update_credits(peer, io, rlp), + packet::ACKNOWLEDGE_UPDATE => self.acknowledge_update(peer, io, rlp), + packet::SEND_TRANSACTIONS => self.relay_transactions(peer, io, rlp), other => { @@ -497,13 +554,22 @@ impl LightProtocol { } } - // request timeouts + // request and update ack timeouts + let ack_duration = Duration::milliseconds(timeout::ACKNOWLEDGE_UPDATE); { for (peer_id, peer) in self.peers.read().iter() { - if peer.lock().pending_requests.check_timeout(now) { + let peer = peer.lock(); + if peer.pending_requests.check_timeout(now) { debug!(target: "pip", "Peer {} request timeout", peer_id); io.disconnect_peer(*peer_id); } + + if let Some((ref start, _)) = peer.awaiting_acknowledge { + if *start + ack_duration <= now { + debug!(target: "pip", "Peer {} update acknowledgement timeout", peer_id); + io.disconnect_peer(*peer_id); + } + } } } } @@ -574,7 +640,8 @@ impl LightProtocol { }; let capabilities = self.capabilities.read().clone(); - let status_packet = status::write_handshake(&status, &capabilities, Some(&self.flow_params)); + let local_flow = self.flow_params.read(); + let status_packet = status::write_handshake(&status, &capabilities, Some(&**local_flow)); self.pending_peers.write().insert(*peer, PendingPeer { sent_head: chain_info.best_block_hash, @@ -628,6 +695,35 @@ impl LightProtocol { }) } } + + fn begin_new_cost_period(&self, io: &IoContext) { + self.load_distribution.end_period(&*self.sample_store); + + let new_params = Arc::new(FlowParams::from_request_times( + |kind| self.load_distribution.expected_time_ns(kind), + self.config.load_share, + self.config.max_stored_seconds, + )); + *self.flow_params.write() = new_params.clone(); + + let peers = self.peers.read(); + let now = SteadyTime::now(); + + let packet_body = { + let mut stream = RlpStream::new_list(3); + stream.append(new_params.limit()) + .append(new_params.recharge_rate()) + .append(new_params.cost_table()); + stream.out() + }; + + for (peer_id, peer_info) in peers.iter() { + let mut peer_info = peer_info.lock(); + + io.send(*peer_id, packet::UPDATE_CREDITS, packet_body.clone()); + peer_info.awaiting_acknowledge = Some((now.clone(), new_params.clone())); + } + } } impl LightProtocol { @@ -653,9 +749,10 @@ impl LightProtocol { } let remote_flow = flow_params.map(|params| (params.create_credits(), params)); + let local_flow = self.flow_params.read().clone(); self.peers.write().insert(*peer, Mutex::new(Peer { - local_credits: self.flow_params.create_credits(), + local_credits: local_flow.create_credits(), status: status.clone(), capabilities: capabilities.clone(), remote_flow: remote_flow, @@ -664,6 +761,9 @@ impl LightProtocol { pending_requests: RequestSet::default(), failed_requests: Vec::new(), propagated_transactions: HashSet::new(), + skip_update: false, + local_flow: local_flow, + awaiting_acknowledge: None, })); for handler in &self.handlers { @@ -739,6 +839,7 @@ impl LightProtocol { } }; let mut peer = peer.lock(); + let peer: &mut Peer = &mut *peer; let req_id: u64 = raw.val_at(0)?; let mut request_builder = RequestBuilder::default(); @@ -746,12 +847,13 @@ impl LightProtocol { trace!(target: "pip", "Received requests (id: {}) from peer {}", req_id, peer_id); // deserialize requests, check costs and request validity. - self.flow_params.recharge(&mut peer.local_credits); + peer.local_flow.recharge(&mut peer.local_credits); - peer.local_credits.deduct_cost(self.flow_params.base_cost())?; + peer.local_credits.deduct_cost(peer.local_flow.base_cost())?; for request_rlp in raw.at(1)?.iter().take(MAX_REQUESTS) { let request: Request = request_rlp.as_val()?; - peer.local_credits.deduct_cost(self.flow_params.compute_cost(&request))?; + let cost = peer.local_flow.compute_cost(&request); + peer.local_credits.deduct_cost(cost)?; request_builder.push(request).map_err(|_| Error::BadBackReference)?; } @@ -761,6 +863,7 @@ impl LightProtocol { // respond to all requests until one fails. let responses = requests.respond_to_all(|complete_req| { + let _timer = self.load_distribution.begin_timer(&complete_req); match complete_req { CompleteRequest::Headers(req) => self.provider.block_headers(req).map(Response::Headers), CompleteRequest::HeaderProof(req) => self.provider.header_proof(req).map(Response::HeaderProof), @@ -804,6 +907,60 @@ impl LightProtocol { Ok(()) } + // handle an update of request credits parameters. + fn update_credits(&self, peer_id: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> { + let peers = self.peers.read(); + + let peer = peers.get(peer_id).ok_or(Error::UnknownPeer)?; + let mut peer = peer.lock(); + + trace!(target: "pip", "Received an update to request credit params from peer {}", peer_id); + + { + let &mut (ref mut credits, ref mut old_params) = peer.remote_flow.as_mut().ok_or(Error::NotServer)?; + old_params.recharge(credits); + + let new_params = FlowParams::new( + raw.val_at(0)?, // limit + raw.val_at(2)?, // cost table + raw.val_at(1)?, // recharge. + ); + + // preserve ratio of current : limit when updating params. + credits.maintain_ratio(*old_params.limit(), *new_params.limit()); + *old_params = new_params; + } + + // set flag to true when there is an in-flight request + // corresponding to old flow params. + if !peer.pending_requests.is_empty() { + peer.skip_update = true; + } + + // let peer know we've acknowledged the update. + io.respond(packet::ACKNOWLEDGE_UPDATE, Vec::new()); + Ok(()) + } + + // handle an acknowledgement of request credits update. + fn acknowledge_update(&self, peer_id: &PeerId, _io: &IoContext, _raw: UntrustedRlp) -> Result<(), Error> { + let peers = self.peers.read(); + let peer = peers.get(peer_id).ok_or(Error::UnknownPeer)?; + let mut peer = peer.lock(); + + trace!(target: "pip", "Received an acknowledgement for new request credit params from peer {}", peer_id); + + let (_, new_params) = match peer.awaiting_acknowledge.take() { + Some(x) => x, + None => return Err(Error::UnsolicitedResponse), + }; + + let old_limit = *peer.local_flow.limit(); + peer.local_credits.maintain_ratio(old_limit, *new_params.limit()); + peer.local_flow = new_params; + Ok(()) + } + // Receive a set of transactions to relay. fn relay_transactions(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> { const MAX_TRANSACTIONS: usize = 256; @@ -850,6 +1007,8 @@ impl NetworkProtocolHandler for LightProtocol { .expect("Error registering sync timer."); io.register_timer(PROPAGATE_TIMEOUT, PROPAGATE_TIMEOUT_INTERVAL_MS) .expect("Error registering sync timer."); + io.register_timer(RECALCULATE_COSTS_TIMEOUT, RECALCULATE_COSTS_INTERVAL_MS) + .expect("Error registering request timer interval token."); } fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { @@ -869,6 +1028,7 @@ impl NetworkProtocolHandler for LightProtocol { TIMEOUT => self.timeout_check(io), TICK_TIMEOUT => self.tick_handlers(io), PROPAGATE_TIMEOUT => self.propagate_transactions(io), + RECALCULATE_COSTS_TIMEOUT => self.begin_new_cost_period(io), _ => warn!(target: "pip", "received timeout on unknown token {}", timer), } } diff --git a/ethcore/light/src/net/request_credits.rs b/ethcore/light/src/net/request_credits.rs index 0c94d7bc9..330576f9c 100644 --- a/ethcore/light/src/net/request_credits.rs +++ b/ethcore/light/src/net/request_credits.rs @@ -56,6 +56,11 @@ impl Credits { self.recharge_point = SteadyTime::now(); } + /// Maintain ratio to current limit against an old limit. + pub fn maintain_ratio(&mut self, old_limit: U256, new_limit: U256) { + self.estimate = (new_limit * self.estimate) / old_limit; + } + /// Attempt to apply the given cost to the amount of credits. /// /// If successful, the cost will be deducted successfully. @@ -188,6 +193,53 @@ impl FlowParams { } } + /// Create new flow parameters from , + /// proportion of total capacity which should be given to a peer, + /// and number of seconds of stored capacity a peer can accumulate. + pub fn from_request_times u64>( + request_time_ns: F, + load_share: f64, + max_stored_seconds: u64 + ) -> Self { + use request::Kind; + + let load_share = load_share.abs(); + + let recharge: u64 = 100_000_000; + let max = recharge.saturating_mul(max_stored_seconds); + + let cost_for_kind = |kind| { + // how many requests we can handle per second + let ns = request_time_ns(kind); + let second_duration = 1_000_000_000f64 / ns as f64; + + // scale by share of the load given to this peer. + let serve_per_second = second_duration * load_share; + let serve_per_second = serve_per_second.max(1.0 / 10_000.0); + + // as a percentage of the recharge per second. + U256::from((recharge as f64 / serve_per_second) as u64) + }; + + let costs = CostTable { + base: 0.into(), + headers: cost_for_kind(Kind::Headers), + body: cost_for_kind(Kind::Body), + receipts: cost_for_kind(Kind::Receipts), + account: cost_for_kind(Kind::Account), + storage: cost_for_kind(Kind::Storage), + code: cost_for_kind(Kind::Code), + header_proof: cost_for_kind(Kind::HeaderProof), + transaction_proof: cost_for_kind(Kind::Execution), + }; + + FlowParams { + costs: costs, + limit: max.into(), + recharge: recharge.into(), + } + } + /// Create effectively infinite flow params. pub fn free() -> Self { let free_cost: U256 = 0.into(); @@ -316,4 +368,28 @@ mod tests { assert_eq!(credits.estimate, 100.into()); } + + #[test] + fn scale_by_load_share_and_time() { + let flow_params = FlowParams::from_request_times( + |_| 10_000, + 0.05, + 60, + ); + + let flow_params2 = FlowParams::from_request_times( + |_| 10_000, + 0.1, + 60, + ); + + let flow_params3 = FlowParams::from_request_times( + |_| 5_000, + 0.05, + 60, + ); + + assert_eq!(flow_params2.costs, flow_params3.costs); + assert_eq!(flow_params.costs.headers, flow_params2.costs.headers * 2.into()); + } } diff --git a/ethcore/light/src/net/request_set.rs b/ethcore/light/src/net/request_set.rs index c5608050f..1277c2615 100644 --- a/ethcore/light/src/net/request_set.rs +++ b/ethcore/light/src/net/request_set.rs @@ -120,6 +120,8 @@ impl RequestSet { pub fn is_empty(&self) -> bool { self.len() == 0 } /// The cumulative cost of all requests in the set. + // this may be useful later for load balancing. + #[allow(dead_code)] pub fn cumulative_cost(&self) -> U256 { self.cumulative_cost } } diff --git a/ethcore/light/src/net/tests/mod.rs b/ethcore/light/src/net/tests/mod.rs index 94788a727..a4c9754ba 100644 --- a/ethcore/light/src/net/tests/mod.rs +++ b/ethcore/light/src/net/tests/mod.rs @@ -24,9 +24,8 @@ use ethcore::transaction::{Action, PendingTransaction}; use ethcore::encoded; use network::{PeerId, NodeId}; -use net::request_credits::FlowParams; use net::context::IoContext; -use net::status::{Capabilities, Status, write_handshake}; +use net::status::{Capabilities, Status}; use net::{LightProtocol, Params, packet, Peer}; use provider::Provider; use request; @@ -162,10 +161,6 @@ impl Provider for TestProvider { } } -fn make_flow_params() -> FlowParams { - FlowParams::new(5_000_000.into(), Default::default(), 100_000.into()) -} - fn capabilities() -> Capabilities { Capabilities { serve_headers: true, @@ -175,16 +170,22 @@ fn capabilities() -> Capabilities { } } +fn write_handshake(status: &Status, capabilities: &Capabilities, proto: &LightProtocol) -> Vec { + let flow_params = proto.flow_params.read().clone(); + ::net::status::write_handshake(status, capabilities, Some(&*flow_params)) +} + // helper for setting up the protocol handler and provider. -fn setup(flow_params: FlowParams, capabilities: Capabilities) -> (Arc, LightProtocol) { +fn setup(capabilities: Capabilities) -> (Arc, LightProtocol) { let provider = Arc::new(TestProviderInner { client: TestBlockChainClient::new(), }); let proto = LightProtocol::new(Arc::new(TestProvider(provider.clone())), Params { network_id: 2, - flow_params: flow_params, + config: Default::default(), capabilities: capabilities, + sample_store: None, }); (provider, proto) @@ -204,14 +205,13 @@ fn status(chain_info: BlockChainInfo) -> Status { #[test] fn handshake_expected() { - let flow_params = make_flow_params(); let capabilities = capabilities(); - let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); + let (provider, proto) = setup(capabilities.clone()); let status = status(provider.client.chain_info()); - let packet_body = write_handshake(&status, &capabilities, Some(&flow_params)); + let packet_body = write_handshake(&status, &capabilities, &proto); proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); } @@ -219,42 +219,40 @@ fn handshake_expected() { #[test] #[should_panic] fn genesis_mismatch() { - let flow_params = make_flow_params(); let capabilities = capabilities(); - let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); + let (provider, proto) = setup(capabilities.clone()); let mut status = status(provider.client.chain_info()); status.genesis_hash = H256::default(); - let packet_body = write_handshake(&status, &capabilities, Some(&flow_params)); + let packet_body = write_handshake(&status, &capabilities, &proto); proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); } #[test] fn credit_overflow() { - let flow_params = make_flow_params(); let capabilities = capabilities(); - let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); + let (provider, proto) = setup(capabilities.clone()); let status = status(provider.client.chain_info()); { - let packet_body = write_handshake(&status, &capabilities, Some(&flow_params)); + let packet_body = write_handshake(&status, &capabilities, &proto); proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); } { - let my_status = write_handshake(&status, &capabilities, Some(&flow_params)); + let my_status = write_handshake(&status, &capabilities, &proto); proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status); } - // 1000 requests is far too many for the default flow params. + // 1 billion requests is far too many for the default flow params. let requests = encode_single(Request::Headers(IncompleteHeadersRequest { start: HashOrNumber::Number(1).into(), - max: 1000, + max: 1_000_000_000, skip: 0, reverse: false, })); @@ -268,20 +266,20 @@ fn credit_overflow() { #[test] fn get_block_headers() { - let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into()); let capabilities = capabilities(); - let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); + let (provider, proto) = setup(capabilities.clone()); + let flow_params = proto.flow_params.read().clone(); let cur_status = status(provider.client.chain_info()); - let my_status = write_handshake(&cur_status, &capabilities, Some(&flow_params)); + let my_status = write_handshake(&cur_status, &capabilities, &proto); provider.client.add_blocks(100, EachBlockWith::Nothing); let cur_status = status(provider.client.chain_info()); { - let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); + let packet_body = write_handshake(&cur_status, &capabilities, &proto); proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status); } @@ -320,20 +318,20 @@ fn get_block_headers() { #[test] fn get_block_bodies() { - let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into()); let capabilities = capabilities(); - let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); + let (provider, proto) = setup(capabilities.clone()); + let flow_params = proto.flow_params.read().clone(); let cur_status = status(provider.client.chain_info()); - let my_status = write_handshake(&cur_status, &capabilities, Some(&flow_params)); + let my_status = write_handshake(&cur_status, &capabilities, &proto); provider.client.add_blocks(100, EachBlockWith::Nothing); let cur_status = status(provider.client.chain_info()); { - let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); + let packet_body = write_handshake(&cur_status, &capabilities, &proto); proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status); } @@ -368,20 +366,20 @@ fn get_block_bodies() { #[test] fn get_block_receipts() { - let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into()); let capabilities = capabilities(); - let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); + let (provider, proto) = setup(capabilities.clone()); + let flow_params = proto.flow_params.read().clone(); let cur_status = status(provider.client.chain_info()); - let my_status = write_handshake(&cur_status, &capabilities, Some(&flow_params)); + let my_status = write_handshake(&cur_status, &capabilities, &proto); provider.client.add_blocks(1000, EachBlockWith::Nothing); let cur_status = status(provider.client.chain_info()); { - let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); + let packet_body = write_handshake(&cur_status, &capabilities, &proto); proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status); } @@ -423,16 +421,17 @@ fn get_block_receipts() { #[test] fn get_state_proofs() { - let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into()); let capabilities = capabilities(); - let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); + let (provider, proto) = setup(capabilities.clone()); + let flow_params = proto.flow_params.read().clone(); + let provider = TestProvider(provider); let cur_status = status(provider.0.client.chain_info()); { - let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); + let packet_body = write_handshake(&cur_status, &capabilities, &proto); proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body.clone())); proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &packet_body); } @@ -481,15 +480,15 @@ fn get_state_proofs() { #[test] fn get_contract_code() { - let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into()); let capabilities = capabilities(); - let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); + let (provider, proto) = setup(capabilities.clone()); + let flow_params = proto.flow_params.read().clone(); let cur_status = status(provider.client.chain_info()); { - let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); + let packet_body = write_handshake(&cur_status, &capabilities, &proto); proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body.clone())); proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &packet_body); } @@ -524,15 +523,15 @@ fn get_contract_code() { #[test] fn proof_of_execution() { - let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into()); let capabilities = capabilities(); - let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); + let (provider, proto) = setup(capabilities.clone()); + let flow_params = proto.flow_params.read().clone(); let cur_status = status(provider.client.chain_info()); { - let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); + let packet_body = write_handshake(&cur_status, &capabilities, &proto); proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body.clone())); proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &packet_body); } @@ -553,7 +552,11 @@ fn proof_of_execution() { let request_body = make_packet(req_id, &requests); let response = { - let new_creds = *flow_params.limit() - flow_params.compute_cost_multi(requests.requests()); + let limit = *flow_params.limit(); + let cost = flow_params.compute_cost_multi(requests.requests()); + + println!("limit = {}, cost = {}", limit, cost); + let new_creds = limit - cost; let mut response_stream = RlpStream::new_list(3); response_stream.append(&req_id).append(&new_creds).begin_list(0); @@ -581,10 +584,10 @@ fn id_guard() { use super::request_set::RequestSet; use super::ReqId; - let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into()); let capabilities = capabilities(); - let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); + let (provider, proto) = setup(capabilities.clone()); + let flow_params = proto.flow_params.read().clone(); let req_id_1 = ReqId(5143); let req_id_2 = ReqId(1111); @@ -607,12 +610,15 @@ fn id_guard() { local_credits: flow_params.create_credits(), status: status(provider.client.chain_info()), capabilities: capabilities.clone(), - remote_flow: Some((flow_params.create_credits(), flow_params)), + remote_flow: Some((flow_params.create_credits(), (&*flow_params).clone())), sent_head: provider.client.chain_info().best_block_hash, last_update: ::time::SteadyTime::now(), pending_requests: pending_requests, failed_requests: Vec::new(), propagated_transactions: Default::default(), + skip_update: false, + local_flow: flow_params, + awaiting_acknowledge: None, })); // first, malformed responses. diff --git a/ethcore/light/src/types/request/mod.rs b/ethcore/light/src/types/request/mod.rs index 3dd2db629..3953aa88b 100644 --- a/ethcore/light/src/types/request/mod.rs +++ b/ethcore/light/src/types/request/mod.rs @@ -256,6 +256,22 @@ pub enum CompleteRequest { Execution(CompleteExecutionRequest), } +impl CompleteRequest { + /// Inspect the kind of this response. + pub fn kind(&self) -> Kind { + match *self { + CompleteRequest::Headers(_) => Kind::Headers, + CompleteRequest::HeaderProof(_) => Kind::HeaderProof, + CompleteRequest::Receipts(_) => Kind::Receipts, + CompleteRequest::Body(_) => Kind::Body, + CompleteRequest::Account(_) => Kind::Account, + CompleteRequest::Storage(_) => Kind::Storage, + CompleteRequest::Code(_) => Kind::Code, + CompleteRequest::Execution(_) => Kind::Execution, + } + } +} + impl Request { /// Get the request kind. pub fn kind(&self) -> Kind { @@ -396,7 +412,7 @@ impl CheckedRequest for Request { /// Kinds of requests. /// Doubles as the "ID" field of the request. #[repr(u8)] -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Serialize, Deserialize)] pub enum Kind { /// A request for headers. Headers = 0, diff --git a/sync/src/api.rs b/sync/src/api.rs index 3e3234d84..edf83ee17 100644 --- a/sync/src/api.rs +++ b/sync/src/api.rs @@ -195,21 +195,33 @@ pub struct EthSync { impl EthSync { /// Creates and register protocol with the network service pub fn new(params: Params) -> Result, NetworkError> { + const MAX_LIGHTSERV_LOAD: f64 = 0.5; + let pruning_info = params.chain.pruning_info(); let light_proto = match params.config.serve_light { false => None, true => Some({ - let light_params = LightParams { + let sample_store = params.network_config.net_config_path + .clone() + .map(::std::path::PathBuf::from) + .map(|mut p| { p.push("request_timings"); light_net::FileStore(p) }) + .map(|store| Box::new(store) as Box<_>); + + let mut light_params = LightParams { network_id: params.config.network_id, - flow_params: Default::default(), + config: Default::default(), capabilities: Capabilities { serve_headers: true, serve_chain_since: Some(pruning_info.earliest_chain), serve_state_since: Some(pruning_info.earliest_state), tx_relay: true, }, + sample_store: sample_store, }; + let max_peers = ::std::cmp::min(params.network_config.max_peers, 1); + light_params.config.load_share = MAX_LIGHTSERV_LOAD / max_peers as f64; + let mut light_proto = LightProtocol::new(params.provider, light_params); light_proto.add_handler(Arc::new(TxRelay(params.chain.clone()))); @@ -686,13 +698,14 @@ impl LightSync { let (sync, light_proto) = { let light_params = LightParams { network_id: params.network_id, - flow_params: Default::default(), // or `None`? + config: Default::default(), capabilities: Capabilities { serve_headers: false, serve_chain_since: None, serve_state_since: None, tx_relay: false, }, + sample_store: None, }; let mut light_proto = LightProtocol::new(params.client.clone(), light_params); diff --git a/sync/src/light_sync/tests/test_net.rs b/sync/src/light_sync/tests/test_net.rs index 1da4d1659..525216a7e 100644 --- a/sync/src/light_sync/tests/test_net.rs +++ b/sync/src/light_sync/tests/test_net.rs @@ -27,7 +27,6 @@ use ethcore::spec::Spec; use io::IoChannel; use light::client::Client as LightClient; use light::net::{LightProtocol, IoContext, Capabilities, Params as LightParams}; -use light::net::request_credits::FlowParams; use light::provider::LightProvider; use network::{NodeId, PeerId}; use util::RwLock; @@ -88,13 +87,14 @@ impl Peer { pub fn new_full(chain: Arc) -> Self { let params = LightParams { network_id: NETWORK_ID, - flow_params: FlowParams::free(), + config: Default::default(), capabilities: Capabilities { serve_headers: true, serve_chain_since: None, serve_state_since: None, tx_relay: true, }, + sample_store: None, }; let proto = LightProtocol::new(chain.clone(), params); @@ -110,13 +110,14 @@ impl Peer { let sync = Arc::new(LightSync::new(chain.clone()).unwrap()); let params = LightParams { network_id: NETWORK_ID, - flow_params: FlowParams::default(), + config: Default::default(), capabilities: Capabilities { serve_headers: false, serve_chain_since: None, serve_state_since: None, tx_relay: false, }, + sample_store: None, }; let provider = LightProvider::new(chain.clone(), Arc::new(RwLock::new(Default::default())));