diff --git a/Cargo.lock b/Cargo.lock index 366056581..436bf5eb3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -411,6 +411,7 @@ dependencies = [ "rust-crypto 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)", "semver 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", + "stats 0.1.0", "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", "transient-hashmap 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -444,7 +445,7 @@ dependencies = [ "ethcore-rpc 1.6.0", "ethcore-util 1.6.0", "fetch 0.1.0", - "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.10.0-a.0 (git+https://github.com/ethcore/hyper)", "jsonrpc-core 6.0.0 (git+https://github.com/ethcore/jsonrpc.git)", "jsonrpc-http-server 6.0.0 (git+https://github.com/ethcore/jsonrpc.git)", @@ -553,12 +554,13 @@ dependencies = [ "ethcore-ipc-codegen 1.6.0", "ethcore-network 1.6.0", "ethcore-util 1.6.0", - "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "itertools 0.5.9 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", "rlp 0.1.0", "smallvec 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "stats 0.1.0", "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -618,7 +620,7 @@ dependencies = [ "ethstore 0.1.0", "ethsync 1.6.0", "fetch 0.1.0", - "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 6.0.0 (git+https://github.com/ethcore/jsonrpc.git)", "jsonrpc-http-server 6.0.0 (git+https://github.com/ethcore/jsonrpc.git)", "jsonrpc-ipc-server 6.0.0 (git+https://github.com/ethcore/jsonrpc.git)", @@ -633,6 +635,7 @@ dependencies = [ "serde 0.9.6 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 0.9.6 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 0.9.5 (registry+https://github.com/rust-lang/crates.io-index)", + "stats 0.1.0", "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", "transient-hashmap 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -683,7 +686,7 @@ dependencies = [ "ethcore-ipc-codegen 1.6.0", "ethcore-ipc-nano 1.6.0", "ethcore-util 1.6.0", - "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 6.0.0 (git+https://github.com/ethcore/jsonrpc.git)", "jsonrpc-macros 6.0.0 (git+https://github.com/ethcore/jsonrpc.git)", "jsonrpc-tcp-server 6.0.0 (git+https://github.com/ethcore/jsonrpc.git)", @@ -846,7 +849,7 @@ dependencies = [ name = "fetch" version = "0.1.0" dependencies = [ - "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "futures-cpupool 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "mime 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -865,7 +868,7 @@ dependencies = [ [[package]] name = "futures" -version = "0.1.6" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -877,7 +880,7 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "crossbeam 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1083,7 +1086,7 @@ name = "jsonrpc-core" version = "6.0.0" source = "git+https://github.com/ethcore/jsonrpc.git#86d7a89c85f324b5f6671315d9b71010ca995300" dependencies = [ - "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "serde 0.9.6 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 0.9.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1620,7 +1623,7 @@ dependencies = [ "ethabi 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "ethcore-util 1.6.0", "fetch 0.1.0", - "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "mime 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "mime_guess 1.6.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1661,7 +1664,7 @@ dependencies = [ name = "parity-reactor" version = "0.1.0" dependencies = [ - "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1672,7 +1675,7 @@ dependencies = [ "ethcore-rpc 1.6.0", "ethcore-signer 1.6.0", "ethcore-util 1.6.0", - "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 6.0.0 (git+https://github.com/ethcore/jsonrpc.git)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1990,7 +1993,7 @@ dependencies = [ "ethcore-bigint 0.1.2", "ethcore-rpc 1.6.0", "ethcore-util 1.6.0", - "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "parity-rpc-client 1.4.0", "rpassword 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -2223,6 +2226,13 @@ name = "stable-heap" version = "0.1.0" source = "git+https://github.com/carllerche/stable-heap?rev=3c5cd1ca47#3c5cd1ca4706f167a1de85658b5af0d6d3e65165" +[[package]] +name = "stats" +version = "0.1.0" +dependencies = [ + "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "strsim" version = "0.3.0" @@ -2334,7 +2344,7 @@ name = "tokio-core" version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "mio 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", "scoped-tls 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2346,7 +2356,7 @@ name = "tokio-proto" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "net2 0.2.23 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2362,7 +2372,7 @@ name = "tokio-service" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2595,7 +2605,7 @@ dependencies = [ "checksum ethabi 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5d8f6cc4c1acd005f48e1d17b06a461adac8fb6eeeb331fbf19a0e656fba91cd" "checksum fdlimit 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b1ee15a7050e5580b3712877157068ea713b245b080ff302ae2ca973cfcd9baa" "checksum flate2 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)" = "3eeb481e957304178d2e782f2da1257f1434dfecbae883bafb61ada2a9fea3bb" -"checksum futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "0bad0a2ac64b227fdc10c254051ae5af542cf19c9328704fd4092f7914196897" +"checksum futures 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "c1913eb7083840b1bbcbf9631b7fda55eaf35fe7ead13cca034e8946f9e2bc41" "checksum futures-cpupool 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "bb982bb25cd8fa5da6a8eb3a460354c984ff1113da82bcb4f0b0862b5795db82" "checksum gcc 0.3.35 (registry+https://github.com/rust-lang/crates.io-index)" = "91ecd03771effb0c968fd6950b37e89476a578aaf1c70297d8e92b6516ec3312" "checksum gdi32-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0912515a8ff24ba900422ecda800b52f4016a56251922d397c576bf92c690518" diff --git a/ethcore/Cargo.toml b/ethcore/Cargo.toml index 442f8b785..c8a1c7fb5 100644 --- a/ethcore/Cargo.toml +++ b/ethcore/Cargo.toml @@ -43,6 +43,7 @@ rlp = { path = "../util/rlp" } ethcore-stratum = { path = "../stratum" } ethcore-bloom-journal = { path = "../util/bloom" } hardware-wallet = { path = "../hw" } +stats = { path = "../util/stats" } [dependencies.hyper] git = "https://github.com/ethcore/hyper" diff --git a/ethcore/light/Cargo.toml b/ethcore/light/Cargo.toml index d2444dd59..9e10449fb 100644 --- a/ethcore/light/Cargo.toml +++ b/ethcore/light/Cargo.toml @@ -23,6 +23,7 @@ smallvec = "0.3.1" futures = "0.1" rand = "0.3" itertools = "0.5" +stats = { path = "../../util/stats" } [features] default = [] diff --git a/ethcore/light/src/cache.rs b/ethcore/light/src/cache.rs new file mode 100644 index 000000000..185007a1b --- /dev/null +++ b/ethcore/light/src/cache.rs @@ -0,0 +1,175 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Cache for data fetched from the network. +//! +//! Stores ancient block headers, bodies, receipts, and total difficulties. +//! Furthermore, stores a "gas price corpus" of relative recency, which is a sorted +//! vector of all gas prices from a recent range of blocks. + +use ethcore::encoded; +use ethcore::header::BlockNumber; +use ethcore::receipt::Receipt; + +use stats::Corpus; +use time::{SteadyTime, Duration}; +use util::{U256, H256}; +use util::cache::MemoryLruCache; + +/// Configuration for how much data to cache. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CacheSizes { + /// Maximum size, in bytes, of cached headers. + pub headers: usize, + /// Maximum size, in bytes, of cached canonical hashes. + pub canon_hashes: usize, + /// Maximum size, in bytes, of cached block bodies. + pub bodies: usize, + /// Maximum size, in bytes, of cached block receipts. + pub receipts: usize, + /// Maximum size, in bytes, of cached chain score for the block. + pub chain_score: usize, +} + +impl Default for CacheSizes { + fn default() -> Self { + const MB: usize = 1024 * 1024; + CacheSizes { + headers: 10 * MB, + canon_hashes: 3 * MB, + bodies: 20 * MB, + receipts: 10 * MB, + chain_score: 7 * MB, + } + } +} + +/// The light client data cache. +/// +/// Note that almost all getter methods take `&mut self` due to the necessity to update +/// the underlying LRU-caches on read. +pub struct Cache { + headers: MemoryLruCache, + canon_hashes: MemoryLruCache, + bodies: MemoryLruCache, + receipts: MemoryLruCache>, + chain_score: MemoryLruCache, + corpus: Option<(Corpus, SteadyTime)>, + corpus_expiration: Duration, +} + +impl Cache { + /// Create a new data cache with the given sizes and gas price corpus expiration time. + pub fn new(sizes: CacheSizes, corpus_expiration: Duration) -> Self { + Cache { + headers: MemoryLruCache::new(sizes.headers), + canon_hashes: MemoryLruCache::new(sizes.canon_hashes), + bodies: MemoryLruCache::new(sizes.bodies), + receipts: MemoryLruCache::new(sizes.receipts), + chain_score: MemoryLruCache::new(sizes.chain_score), + corpus: None, + corpus_expiration: corpus_expiration, + } + } + + /// Query header by hash. + pub fn block_header(&mut self, hash: &H256) -> Option { + self.headers.get_mut(hash).map(|x| x.clone()) + } + + /// Query hash by number. + pub fn block_hash(&mut self, num: &BlockNumber) -> Option { + self.canon_hashes.get_mut(num).map(|x| x.clone()) + } + + /// Query block body by block hash. + pub fn block_body(&mut self, hash: &H256) -> Option { + self.bodies.get_mut(hash).map(|x| x.clone()) + } + + /// Query block receipts by block hash. + pub fn block_receipts(&mut self, hash: &H256) -> Option> { + self.receipts.get_mut(hash).map(|x| x.clone()) + } + + /// Query chain score by block hash. + pub fn chain_score(&mut self, hash: &H256) -> Option { + self.chain_score.get_mut(hash).map(|x| x.clone()) + } + + /// Cache the given header. + pub fn insert_block_header(&mut self, hash: H256, hdr: encoded::Header) { + self.headers.insert(hash, hdr); + } + + /// Cache the given canonical block hash. + pub fn insert_block_hash(&mut self, num: BlockNumber, hash: H256) { + self.canon_hashes.insert(num, hash); + } + + /// Cache the given block body. + pub fn insert_block_body(&mut self, hash: H256, body: encoded::Body) { + self.bodies.insert(hash, body); + } + + /// Cache the given block receipts. + pub fn insert_block_receipts(&mut self, hash: H256, receipts: Vec) { + self.receipts.insert(hash, receipts); + } + + /// Cache the given chain scoring. + pub fn insert_chain_score(&mut self, hash: H256, score: U256) { + self.chain_score.insert(hash, score); + } + + /// Get gas price corpus, if recent enough. + pub fn gas_price_corpus(&self) -> Option> { + let now = SteadyTime::now(); + + self.corpus.as_ref().and_then(|&(ref corpus, ref tm)| { + if *tm + self.corpus_expiration >= now { + Some(corpus.clone()) + } else { + None + } + }) + } + + /// Set the cached gas price corpus. + pub fn set_gas_price_corpus(&mut self, corpus: Corpus) { + self.corpus = Some((corpus, SteadyTime::now())) + } +} + +#[cfg(test)] +mod tests { + use super::Cache; + use time::Duration; + + #[test] + fn corpus_inaccessible() { + let mut cache = Cache::new(Default::default(), Duration::hours(5)); + + cache.set_gas_price_corpus(vec![].into()); + assert_eq!(cache.gas_price_corpus(), Some(vec![].into())); + + { + let corpus_time = &mut cache.corpus.as_mut().unwrap().1; + *corpus_time = *corpus_time - Duration::hours(6); + } + assert!(cache.gas_price_corpus().is_none()); + } +} diff --git a/ethcore/light/src/client/header_chain.rs b/ethcore/light/src/client/header_chain.rs index 403d3555d..575938cd5 100644 --- a/ethcore/light/src/client/header_chain.rs +++ b/ethcore/light/src/client/header_chain.rs @@ -241,6 +241,14 @@ impl HeaderChain { self.block_header(BlockId::Latest).expect("Header for best block always stored; qed") } + /// Get an iterator over a block and its ancestry. + pub fn ancestry_iter(&self, start: BlockId) -> AncestryIter { + AncestryIter { + next: self.block_header(start), + chain: self, + } + } + /// Get the nth CHT root, if it's been computed. /// /// CHT root 0 is from block `1..2048`. @@ -295,6 +303,25 @@ impl HeapSizeOf for HeaderChain { } } +/// Iterator over a block's ancestry. +pub struct AncestryIter<'a> { + next: Option, + chain: &'a HeaderChain, +} + +impl<'a> Iterator for AncestryIter<'a> { + type Item = encoded::Header; + + fn next(&mut self) -> Option { + let next = self.next.take(); + if let Some(p_hash) = next.as_ref().map(|hdr| hdr.parent_hash()) { + self.next = self.chain.block_header(BlockId::Hash(p_hash)); + } + + next + } +} + #[cfg(test)] mod tests { use super::HeaderChain; diff --git a/ethcore/light/src/client/mod.rs b/ethcore/light/src/client/mod.rs index a113b4367..5701fc606 100644 --- a/ethcore/light/src/client/mod.rs +++ b/ethcore/light/src/client/mod.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use ethcore::block_import_error::BlockImportError; use ethcore::block_status::BlockStatus; -use ethcore::client::ClientReport; +use ethcore::client::{ClientReport, EnvInfo}; use ethcore::engines::Engine; use ethcore::ids::BlockId; use ethcore::header::Header; @@ -33,7 +33,7 @@ use io::IoChannel; use util::{Bytes, H256, Mutex, RwLock}; -use self::header_chain::HeaderChain; +use self::header_chain::{AncestryIter, HeaderChain}; pub use self::service::Service; @@ -62,6 +62,12 @@ pub trait LightChainClient: Send + Sync { /// Get the best block header. fn best_block_header(&self) -> encoded::Header; + /// Get an iterator over a block and its ancestry. + fn ancestry_iter<'a>(&'a self, start: BlockId) -> Box + 'a>; + + /// Get the signing network ID. + fn signing_network_id(&self) -> Option; + /// Query whether a block is known. fn is_known(&self, hash: &H256) -> bool; @@ -164,6 +170,16 @@ impl Client { self.chain.best_header() } + /// Get an iterator over a block and its ancestry. + pub fn ancestry_iter(&self, start: BlockId) -> AncestryIter { + self.chain.ancestry_iter(start) + } + + /// Get the signing network id. + pub fn signing_network_id(&self) -> Option { + self.engine.signing_network_id(&self.latest_env_info()) + } + /// Flush the header queue. pub fn flush_queue(&self) { self.queue.flush() @@ -217,6 +233,33 @@ impl Client { pub fn engine(&self) -> &Engine { &*self.engine } + + fn latest_env_info(&self) -> EnvInfo { + let header = self.best_block_header(); + + EnvInfo { + number: header.number(), + author: header.author(), + timestamp: header.timestamp(), + difficulty: header.difficulty(), + last_hashes: self.build_last_hashes(header.hash()), + gas_used: Default::default(), + gas_limit: header.gas_limit(), + } + } + + fn build_last_hashes(&self, mut parent_hash: H256) -> Arc> { + let mut v = Vec::with_capacity(256); + for _ in 0..255 { + v.push(parent_hash); + match self.block_header(BlockId::Hash(parent_hash)) { + Some(header) => parent_hash = header.hash(), + None => break, + } + } + + Arc::new(v) + } } impl LightChainClient for Client { @@ -234,6 +277,14 @@ impl LightChainClient for Client { Client::best_block_header(self) } + fn ancestry_iter<'a>(&'a self, start: BlockId) -> Box + 'a> { + Box::new(Client::ancestry_iter(self, start)) + } + + fn signing_network_id(&self) -> Option { + Client::signing_network_id(self) + } + fn is_known(&self, hash: &H256) -> bool { self.status(hash) == BlockStatus::InChain } diff --git a/ethcore/light/src/lib.rs b/ethcore/light/src/lib.rs index 94d267c7a..b6e06a02b 100644 --- a/ethcore/light/src/lib.rs +++ b/ethcore/light/src/lib.rs @@ -37,6 +37,7 @@ pub mod cht; pub mod net; pub mod on_demand; pub mod transaction_queue; +pub mod cache; #[cfg(not(feature = "ipc"))] pub mod provider; @@ -71,6 +72,7 @@ extern crate time; extern crate futures; extern crate rand; extern crate itertools; +extern crate stats; #[cfg(feature = "ipc")] extern crate ethcore_ipc as ipc; diff --git a/ethcore/light/src/net/mod.rs b/ethcore/light/src/net/mod.rs index 898934965..eb5677cfa 100644 --- a/ethcore/light/src/net/mod.rs +++ b/ethcore/light/src/net/mod.rs @@ -322,6 +322,16 @@ impl LightProtocol { .map(|peer| peer.lock().status.clone()) } + /// Get number of (connected, active) peers. + pub fn peer_count(&self) -> (usize, usize) { + let num_pending = self.pending_peers.read().len(); + let peers = self.peers.read(); + ( + num_pending + peers.len(), + peers.values().filter(|p| !p.lock().pending_requests.is_empty()).count(), + ) + } + /// Check the maximum amount of requests of a specific type /// which a peer would be able to serve. Returns zero if the /// peer is unknown or has no buffer flow parameters. diff --git a/ethcore/light/src/net/request_set.rs b/ethcore/light/src/net/request_set.rs index c9f278776..9a26b24b1 100644 --- a/ethcore/light/src/net/request_set.rs +++ b/ethcore/light/src/net/request_set.rs @@ -110,6 +110,14 @@ impl RequestSet { pub fn collect_ids(&self) -> F where F: FromIterator { self.ids.keys().cloned().collect() } + + /// Number of requests in the set. + pub fn len(&self) -> usize { + self.ids.len() + } + + /// Whether the set is empty. + pub fn is_empty(&self) -> bool { self.len() == 0 } } #[cfg(test)] diff --git a/ethcore/light/src/on_demand/mod.rs b/ethcore/light/src/on_demand/mod.rs index c34e2d922..ec3b758ce 100644 --- a/ethcore/light/src/on_demand/mod.rs +++ b/ethcore/light/src/on_demand/mod.rs @@ -19,6 +19,7 @@ //! will take the raw data received here and extract meaningful results from it. use std::collections::HashMap; +use std::sync::Arc; use ethcore::basic_account::BasicAccount; use ethcore::encoded; @@ -28,10 +29,11 @@ use futures::{Async, Poll, Future}; use futures::sync::oneshot::{self, Sender, Receiver}; use network::PeerId; use rlp::{RlpStream, Stream}; -use util::{Bytes, RwLock, U256}; +use util::{Bytes, RwLock, Mutex, U256}; use util::sha3::{SHA3_NULL_RLP, SHA3_EMPTY_LIST_RLP}; use net::{Handler, Status, Capabilities, Announcement, EventContext, BasicContext, ReqId}; +use cache::Cache; use types::les_request::{self as les_request, Request as LesRequest}; pub mod request; @@ -42,9 +44,16 @@ struct Peer { capabilities: Capabilities, } +// Which portions of a CHT proof should be sent. +enum ChtProofSender { + Both(Sender<(encoded::Header, U256)>), + Header(Sender), + ChainScore(Sender), +} + // Attempted request info and sender to put received value. enum Pending { - HeaderByNumber(request::HeaderByNumber, Sender<(encoded::Header, U256)>), // num + CHT root + HeaderByNumber(request::HeaderByNumber, ChtProofSender), HeaderByHash(request::HeaderByHash, Sender), Block(request::Body, Sender), BlockReceipts(request::BlockReceipts, Sender>), @@ -58,30 +67,77 @@ enum Pending { pub struct OnDemand { peers: RwLock>, pending_requests: RwLock>, + cache: Arc>, orphaned_requests: RwLock>, } -impl Default for OnDemand { - fn default() -> Self { +impl OnDemand { + /// Create a new `OnDemand` service with the given cache. + pub fn new(cache: Arc>) -> Self { OnDemand { peers: RwLock::new(HashMap::new()), pending_requests: RwLock::new(HashMap::new()), + cache: cache, orphaned_requests: RwLock::new(Vec::new()), } } -} -impl OnDemand { /// Request a header by block number and CHT root hash. - /// Returns the header and the total difficulty. - pub fn header_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber) -> Receiver<(encoded::Header, U256)> { + /// Returns the header. + pub fn header_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber) -> Receiver { let (sender, receiver) = oneshot::channel(); - self.dispatch_header_by_number(ctx, req, sender); + let cached = { + let mut cache = self.cache.lock(); + cache.block_hash(&req.num()).and_then(|hash| cache.block_header(&hash)) + }; + + match cached { + Some(hdr) => sender.complete(hdr), + None => self.dispatch_header_by_number(ctx, req, ChtProofSender::Header(sender)), + } + receiver + } + + /// Request a canonical block's chain score. + /// Returns the chain score. + pub fn chain_score_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber) -> Receiver { + let (sender, receiver) = oneshot::channel(); + let cached = { + let mut cache = self.cache.lock(); + cache.block_hash(&req.num()).and_then(|hash| cache.chain_score(&hash)) + }; + + match cached { + Some(score) => sender.complete(score), + None => self.dispatch_header_by_number(ctx, req, ChtProofSender::ChainScore(sender)), + } + + receiver + } + + /// Request a canonical block's chain score. + /// Returns the header and chain score. + pub fn header_and_score_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber) -> Receiver<(encoded::Header, U256)> { + let (sender, receiver) = oneshot::channel(); + let cached = { + let mut cache = self.cache.lock(); + let hash = cache.block_hash(&req.num()); + ( + hash.clone().and_then(|hash| cache.block_header(&hash)), + hash.and_then(|hash| cache.chain_score(&hash)), + ) + }; + + match cached { + (Some(hdr), Some(score)) => sender.complete((hdr, score)), + _ => self.dispatch_header_by_number(ctx, req, ChtProofSender::Both(sender)), + } + receiver } // dispatch the request, completing the request if no peers available. - fn dispatch_header_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber, sender: Sender<(encoded::Header, U256)>) { + fn dispatch_header_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber, sender: ChtProofSender) { let num = req.num(); let cht_num = req.cht_num(); @@ -123,7 +179,10 @@ impl OnDemand { /// it as easily. pub fn header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash) -> Receiver { let (sender, receiver) = oneshot::channel(); - self.dispatch_header_by_hash(ctx, req, sender); + match self.cache.lock().block_header(&req.0) { + Some(hdr) => sender.complete(hdr), + None => self.dispatch_header_by_hash(ctx, req, sender), + } receiver } @@ -181,7 +240,16 @@ impl OnDemand { sender.complete(encoded::Block::new(stream.out())) } else { - self.dispatch_block(ctx, req, sender); + match self.cache.lock().block_body(&req.hash) { + Some(body) => { + let mut stream = RlpStream::new_list(3); + stream.append_raw(&req.header.into_inner(), 1); + stream.append_raw(&body.into_inner(), 2); + + sender.complete(encoded::Block::new(stream.out())); + } + None => self.dispatch_block(ctx, req, sender), + } } receiver } @@ -224,7 +292,10 @@ impl OnDemand { if req.0.receipts_root() == SHA3_NULL_RLP { sender.complete(Vec::new()) } else { - self.dispatch_block_receipts(ctx, req, sender); + match self.cache.lock().block_receipts(&req.0.hash()) { + Some(receipts) => sender.complete(receipts), + None => self.dispatch_block_receipts(ctx, req, sender), + } } receiver @@ -378,8 +449,15 @@ impl OnDemand { for orphaned in to_dispatch { match orphaned { - Pending::HeaderByNumber(req, mut sender) => - if !check_hangup(&mut sender) { self.dispatch_header_by_number(ctx, req, sender) }, + Pending::HeaderByNumber(req, mut sender) => { + let hangup = match sender { + ChtProofSender::Both(ref mut s) => check_hangup(s), + ChtProofSender::Header(ref mut s) => check_hangup(s), + ChtProofSender::ChainScore(ref mut s) => check_hangup(s), + }; + + if !hangup { self.dispatch_header_by_number(ctx, req, sender) } + } Pending::HeaderByHash(req, mut sender) => if !check_hangup(&mut sender) { self.dispatch_header_by_hash(ctx, req, sender) }, Pending::Block(req, mut sender) => @@ -439,8 +517,19 @@ impl Handler for OnDemand { Pending::HeaderByNumber(req, sender) => { if let Some(&(ref header, ref proof)) = proofs.get(0) { match req.check_response(header, proof) { - Ok(header) => { - sender.complete(header); + Ok((header, score)) => { + let mut cache = self.cache.lock(); + let hash = header.hash(); + cache.insert_block_header(hash, header.clone()); + cache.insert_block_hash(header.number(), hash); + cache.insert_chain_score(hash, score); + + match sender { + ChtProofSender::Both(sender) => sender.complete((header, score)), + ChtProofSender::Header(sender) => sender.complete(header), + ChtProofSender::ChainScore(sender) => sender.complete(score), + } + return } Err(e) => { @@ -468,6 +557,7 @@ impl Handler for OnDemand { if let Some(ref header) = headers.get(0) { match req.check_response(header) { Ok(header) => { + self.cache.lock().insert_block_header(req.0, header.clone()); sender.complete(header); return } @@ -493,9 +583,11 @@ impl Handler for OnDemand { match req { Pending::Block(req, sender) => { - if let Some(ref block) = bodies.get(0) { - match req.check_response(block) { + if let Some(ref body) = bodies.get(0) { + match req.check_response(body) { Ok(block) => { + let body = encoded::Body::new(body.to_vec()); + self.cache.lock().insert_block_body(req.hash, body); sender.complete(block); return } @@ -524,6 +616,8 @@ impl Handler for OnDemand { if let Some(ref receipts) = receipts.get(0) { match req.check_response(receipts) { Ok(receipts) => { + let hash = req.0.hash(); + self.cache.lock().insert_block_receipts(hash, receipts.clone()); sender.complete(receipts); return } @@ -604,10 +698,16 @@ impl Handler for OnDemand { #[cfg(test)] mod tests { use super::*; + + use std::sync::Arc; + + use cache::Cache; use net::{Announcement, BasicContext, ReqId, Error as LesError}; use request::{Request as LesRequest, Kind as LesRequestKind}; + use network::{PeerId, NodeId}; - use util::H256; + use time::Duration; + use util::{H256, Mutex}; struct FakeContext; @@ -624,7 +724,8 @@ mod tests { #[test] fn detects_hangup() { - let on_demand = OnDemand::default(); + let cache = Arc::new(Mutex::new(Cache::new(Default::default(), Duration::hours(6)))); + let on_demand = OnDemand::new(cache); let result = on_demand.header_by_hash(&FakeContext, request::HeaderByHash(H256::default())); assert!(on_demand.orphaned_requests.read().len() == 1); diff --git a/ethcore/light/src/transaction_queue.rs b/ethcore/light/src/transaction_queue.rs index 8ca6a64f6..d17a863f5 100644 --- a/ethcore/light/src/transaction_queue.rs +++ b/ethcore/light/src/transaction_queue.rs @@ -245,6 +245,31 @@ impl TransactionQueue { .collect() } + /// Get all transactions not ready to be propagated. + /// `best_block_number` and `best_block_timestamp` are used to filter out conditionally + /// propagated transactions. + /// + /// Returned transactions are batched by sender, in order of ascending nonce. + pub fn future_transactions(&self, best_block_number: u64, best_block_timestamp: u64) -> Vec { + self.by_account.values() + .flat_map(|acct_txs| { + acct_txs.current.iter().skip_while(|tx| match tx.condition { + None => true, + Some(Condition::Number(blk_num)) => blk_num <= best_block_number, + Some(Condition::Timestamp(time)) => time <= best_block_timestamp, + }).chain(acct_txs.future.values()).map(|info| info.hash) + }) + .filter_map(|hash| match self.by_hash.get(&hash) { + Some(tx) => Some(tx.clone()), + None => { + warn!(target: "txqueue", "Inconsistency detected between `by_hash` and `by_account`: {} not stored.", + hash); + None + } + }) + .collect() + } + /// Addresses for which we store transactions. pub fn queued_senders(&self) -> Vec
{ self.by_account.keys().cloned().collect() @@ -471,4 +496,22 @@ mod tests { assert!(txq.transaction(&hash).is_none()); } + + #[test] + fn future_transactions() { + let sender = Address::default(); + let mut txq = TransactionQueue::default(); + + for i in (0..1).chain(3..10) { + let mut tx = Transaction::default(); + tx.nonce = i.into(); + + let tx = tx.fake_sign(sender); + + txq.import(tx.into()).unwrap(); + } + + assert_eq!(txq.future_transactions(0, 0).len(), 7); + assert_eq!(txq.next_nonce(&sender).unwrap(), 1.into()); + } } diff --git a/ethcore/src/client/traits.rs b/ethcore/src/client/traits.rs index dce708b3a..6e1ea9d31 100644 --- a/ethcore/src/client/traits.rs +++ b/ethcore/src/client/traits.rs @@ -16,7 +16,6 @@ use std::collections::BTreeMap; use util::{U256, Address, H256, H2048, Bytes, Itertools}; -use util::stats::Histogram; use blockchain::TreeRoute; use verification::queue::QueueInfo as BlockQueueInfo; use block::{OpenBlock, SealedBlock}; @@ -212,38 +211,24 @@ pub trait BlockChainClient : Sync + Send { fn ready_transactions(&self) -> Vec; /// Sorted list of transaction gas prices from at least last sample_size blocks. - fn gas_price_corpus(&self, sample_size: usize) -> Vec { + fn gas_price_corpus(&self, sample_size: usize) -> ::stats::Corpus { let mut h = self.chain_info().best_block_hash; let mut corpus = Vec::new(); while corpus.is_empty() { for _ in 0..sample_size { - let block = self.block(BlockId::Hash(h)).expect("h is either the best_block_hash or an ancestor; qed"); - let header = block.header_view(); - if header.number() == 0 { - corpus.sort(); - return corpus; + let block = match self.block(BlockId::Hash(h)) { + Some(block) => block, + None => return corpus.into(), + }; + + if block.number() == 0 { + return corpus.into(); } block.transaction_views().iter().foreach(|t| corpus.push(t.gas_price())); - h = header.parent_hash().clone(); + h = block.parent_hash().clone(); } } - corpus.sort(); - corpus - } - - /// Calculate median gas price from recent blocks if they have any transactions. - fn gas_price_median(&self, sample_size: usize) -> Option { - let corpus = self.gas_price_corpus(sample_size); - corpus.get(corpus.len() / 2).cloned() - } - - /// Get the gas price distribution based on recent blocks if they have any transactions. - fn gas_price_histogram(&self, sample_size: usize, bucket_number: usize) -> Option { - let raw_corpus = self.gas_price_corpus(sample_size); - let raw_len = raw_corpus.len(); - // Throw out outliers. - let (corpus, _) = raw_corpus.split_at(raw_len - raw_len / 40); - Histogram::new(corpus, bucket_number) + corpus.into() } /// Get the preferred network ID to sign on diff --git a/ethcore/src/lib.rs b/ethcore/src/lib.rs index be5247340..3a56db51b 100644 --- a/ethcore/src/lib.rs +++ b/ethcore/src/lib.rs @@ -106,6 +106,7 @@ extern crate lru_cache; extern crate ethcore_stratum; extern crate ethabi; extern crate hardware_wallet; +extern crate stats; #[macro_use] extern crate log; diff --git a/ethcore/src/tests/client.rs b/ethcore/src/tests/client.rs index d37551231..d439d7057 100644 --- a/ethcore/src/tests/client.rs +++ b/ethcore/src/tests/client.rs @@ -27,7 +27,6 @@ use miner::Miner; use rlp::View; use spec::Spec; use views::BlockView; -use util::stats::Histogram; use ethkey::{KeyPair, Secret}; use transaction::{PendingTransaction, Transaction, Action, Condition}; use miner::MinerService; @@ -208,11 +207,11 @@ fn can_collect_garbage() { fn can_generate_gas_price_median() { let client_result = generate_dummy_client_with_data(3, 1, slice_into![1, 2, 3]); let client = client_result.reference(); - assert_eq!(Some(U256::from(2)), client.gas_price_median(3)); + assert_eq!(Some(&U256::from(2)), client.gas_price_corpus(3).median()); let client_result = generate_dummy_client_with_data(4, 1, slice_into![1, 4, 3, 2]); let client = client_result.reference(); - assert_eq!(Some(U256::from(3)), client.gas_price_median(4)); + assert_eq!(Some(&U256::from(3)), client.gas_price_corpus(3).median()); } #[test] @@ -220,8 +219,8 @@ fn can_generate_gas_price_histogram() { let client_result = generate_dummy_client_with_data(20, 1, slice_into![6354,8593,6065,4842,7845,7002,689,4958,4250,6098,5804,4320,643,8895,2296,8589,7145,2000,2512,1408]); let client = client_result.reference(); - let hist = client.gas_price_histogram(20, 5).unwrap(); - let correct_hist = Histogram { bucket_bounds: vec_into![643, 2294, 3945, 5596, 7247, 8898], counts: vec![4,2,4,6,4] }; + let hist = client.gas_price_corpus(20).histogram(5).unwrap(); + let correct_hist = ::stats::Histogram { bucket_bounds: vec_into![643, 2294, 3945, 5596, 7247, 8898], counts: vec![4,2,4,6,4] }; assert_eq!(hist, correct_hist); } @@ -230,7 +229,7 @@ fn empty_gas_price_histogram() { let client_result = generate_dummy_client_with_data(20, 0, slice_into![]); let client = client_result.reference(); - assert!(client.gas_price_histogram(20, 5).is_none()); + assert!(client.gas_price_corpus(20).histogram(5).is_none()); } #[test] diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 8b8f9ecd7..91058b990 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -39,6 +39,7 @@ rlp = { path = "../util/rlp" } fetch = { path = "../util/fetch" } parity-reactor = { path = "../util/reactor" } clippy = { version = "0.0.103", optional = true} +stats = { path = "../util/stats" } [features] dev = ["clippy", "ethcore/dev", "ethcore-util/dev", "ethsync/dev"] diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 030a03702..201f41c22 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -44,6 +44,7 @@ extern crate futures; extern crate order_stat; extern crate parity_updater as updater; extern crate parity_reactor; +extern crate stats; #[macro_use] extern crate log; diff --git a/rpc/src/v1/helpers/dispatch.rs b/rpc/src/v1/helpers/dispatch.rs index de0207d79..0bea7f9a1 100644 --- a/rpc/src/v1/helpers/dispatch.rs +++ b/rpc/src/v1/helpers/dispatch.rs @@ -20,16 +20,19 @@ use std::fmt::Debug; use std::ops::Deref; use std::sync::{Arc, Weak}; -use futures::{future, Future, BoxFuture}; +use futures::{future, stream, Future, Stream, BoxFuture}; +use light::cache::Cache as LightDataCache; use light::client::LightChainClient; use light::on_demand::{request, OnDemand}; use light::TransactionQueue as LightTransactionQueue; -use rlp::{self, Stream}; -use util::{Address, H520, H256, U256, Uint, Bytes, RwLock}; +use rlp::{self, Stream as StreamRlp}; +use util::{Address, H520, H256, U256, Uint, Bytes, Mutex, RwLock}; use util::sha3::Hashable; +use stats::Corpus; use ethkey::Signature; use ethsync::LightSync; +use ethcore::ids::BlockId; use ethcore::miner::MinerService; use ethcore::client::MiningBlockChainClient; use ethcore::transaction::{Action, SignedTransaction, PendingTransaction, Transaction}; @@ -159,10 +162,16 @@ impl Dispatcher for FullDispatcher, - client: Arc, - on_demand: Arc, - transaction_queue: Arc>, + /// Sync service. + pub sync: Arc, + /// Header chain client. + pub client: Arc, + /// On-demand request service. + pub on_demand: Arc, + /// Data cache. + pub cache: Arc>, + /// Transaction queue. + pub transaction_queue: Arc>, } impl LightDispatcher { @@ -173,43 +182,121 @@ impl LightDispatcher { sync: Arc, client: Arc, on_demand: Arc, + cache: Arc>, transaction_queue: Arc>, ) -> Self { LightDispatcher { sync: sync, client: client, on_demand: on_demand, + cache: cache, transaction_queue: transaction_queue, } } + + /// Get a recent gas price corpus. + // TODO: this could be `impl Trait`. + pub fn gas_price_corpus(&self) -> BoxFuture, Error> { + const GAS_PRICE_SAMPLE_SIZE: usize = 100; + + if let Some(cached) = self.cache.lock().gas_price_corpus() { + return future::ok(cached).boxed() + } + + let cache = self.cache.clone(); + let eventual_corpus = self.sync.with_context(|ctx| { + // get some recent headers with gas used, + // and request each of the blocks from the network. + let block_futures = self.client.ancestry_iter(BlockId::Latest) + .filter(|hdr| hdr.gas_used() != U256::default()) + .take(GAS_PRICE_SAMPLE_SIZE) + .map(request::Body::new) + .map(|req| self.on_demand.block(ctx, req)); + + // as the blocks come in, collect gas prices into a vector + stream::futures_unordered(block_futures) + .fold(Vec::new(), |mut v, block| { + for t in block.transaction_views().iter() { + v.push(t.gas_price()) + } + + future::ok(v) + }) + .map(move |v| { + // produce a corpus from the vector, cache it, and return + // the median as the intended gas price. + let corpus: ::stats::Corpus<_> = v.into(); + cache.lock().set_gas_price_corpus(corpus.clone()); + corpus + }) + }); + + match eventual_corpus { + Some(corp) => corp.map_err(|_| errors::no_light_peers()).boxed(), + None => future::err(errors::network_disabled()).boxed(), + } + } + + /// Get an account's next nonce. + pub fn next_nonce(&self, addr: Address) -> BoxFuture { + // fast path where we don't go to network; nonce provided or can be gotten from queue. + let maybe_nonce = self.transaction_queue.read().next_nonce(&addr); + if let Some(nonce) = maybe_nonce { + return future::ok(nonce).boxed() + } + + let best_header = self.client.best_block_header(); + let nonce_future = self.sync.with_context(|ctx| self.on_demand.account(ctx, request::Account { + header: best_header, + address: addr, + })); + + match nonce_future { + Some(x) => x.map(|acc| acc.nonce).map_err(|_| errors::no_light_peers()).boxed(), + None => future::err(errors::network_disabled()).boxed() + } + } } impl Dispatcher for LightDispatcher { fn fill_optional_fields(&self, request: TransactionRequest, default_sender: Address) -> BoxFuture { - let request = request; - let gas_limit = self.client.best_block_header().gas_limit(); + const DEFAULT_GAS_PRICE: U256 = U256([0, 0, 0, 21_000_000]); - future::ok(FilledTransactionRequest { - from: request.from.unwrap_or(default_sender), - used_default_from: request.from.is_none(), - to: request.to, - nonce: request.nonce, - gas_price: request.gas_price.unwrap_or_else(|| 21_000_000.into()), // TODO: fetch corpus from network. - gas: request.gas.unwrap_or_else(|| gas_limit / 3.into()), - value: request.value.unwrap_or_else(|| 0.into()), - data: request.data.unwrap_or_else(Vec::new), - condition: request.condition, - }).boxed() + let gas_limit = self.client.best_block_header().gas_limit(); + let request_gas_price = request.gas_price.clone(); + + let with_gas_price = move |gas_price| { + let request = request; + FilledTransactionRequest { + from: request.from.unwrap_or(default_sender), + used_default_from: request.from.is_none(), + to: request.to, + nonce: request.nonce, + gas_price: gas_price, + gas: request.gas.unwrap_or_else(|| gas_limit / 3.into()), + value: request.value.unwrap_or_else(|| 0.into()), + data: request.data.unwrap_or_else(Vec::new), + condition: request.condition, + } + }; + + // fast path for known gas price. + match request_gas_price { + Some(gas_price) => future::ok(with_gas_price(gas_price)).boxed(), + None => self.gas_price_corpus().and_then(|corp| match corp.median() { + Some(median) => future::ok(*median), + None => future::ok(DEFAULT_GAS_PRICE), // fall back to default on error. + }).map(with_gas_price).boxed() + } } fn sign(&self, accounts: Arc, filled: FilledTransactionRequest, password: SignWith) -> BoxFuture, Error> { - let network_id = None; // TODO: fetch from client. + let network_id = self.client.signing_network_id(); let address = filled.from; - let best_header = self.client.best_block_header(); let with_nonce = move |filled: FilledTransactionRequest, nonce| { let t = Transaction { @@ -234,25 +321,14 @@ impl Dispatcher for LightDispatcher { })) }; - // fast path where we don't go to network; nonce provided or can be gotten from queue. - let maybe_nonce = filled.nonce.or_else(|| self.transaction_queue.read().next_nonce(&address)); - if let Some(nonce) = maybe_nonce { + // fast path for pre-filled nonce. + if let Some(nonce) = filled.nonce { return future::done(with_nonce(filled, nonce)).boxed() } - let nonce_future = self.sync.with_context(|ctx| self.on_demand.account(ctx, request::Account { - header: best_header, - address: address, - })); - - let nonce_future = match nonce_future { - Some(x) => x, - None => return future::err(errors::no_light_peers()).boxed() - }; - - nonce_future + self.next_nonce(address) .map_err(|_| errors::no_light_peers()) - .and_then(move |acc| with_nonce(filled, acc.nonce)) + .and_then(move |nonce| with_nonce(filled, nonce)) .boxed() } @@ -453,7 +529,7 @@ fn decrypt(accounts: &AccountProvider, address: Address, msg: Bytes, password: S pub fn default_gas_price(client: &C, miner: &M) -> U256 where C: MiningBlockChainClient, M: MinerService { - client.gas_price_median(100).unwrap_or_else(|| miner.sensible_gas_price()) + client.gas_price_corpus(100).median().cloned().unwrap_or_else(|| miner.sensible_gas_price()) } /// Convert RPC confirmation payload to signer confirmation payload. diff --git a/rpc/src/v1/helpers/errors.rs b/rpc/src/v1/helpers/errors.rs index 93d23b1aa..8c8cf617e 100644 --- a/rpc/src/v1/helpers/errors.rs +++ b/rpc/src/v1/helpers/errors.rs @@ -60,6 +60,14 @@ pub fn unimplemented(details: Option) -> Error { } } +pub fn light_unimplemented(details: Option) -> Error { + Error { + code: ErrorCode::ServerError(codes::UNSUPPORTED_REQUEST), + message: "This request is unsupported for light clients.".into(), + data: details.map(Value::String), + } +} + pub fn request_not_found() -> Error { Error { code: ErrorCode::ServerError(codes::REQUEST_NOT_FOUND), diff --git a/rpc/src/v1/impls/light/eth.rs b/rpc/src/v1/impls/light/eth.rs index 2e129d31e..6251b67fc 100644 --- a/rpc/src/v1/impls/light/eth.rs +++ b/rpc/src/v1/impls/light/eth.rs @@ -62,11 +62,6 @@ pub struct EthClient { accounts: Arc, } -// helper for internal error: no network context. -fn err_no_context() -> Error { - errors::internal("network service detached", "") -} - // helper for internal error: on demand sender cancelled. fn err_premature_cancel(_cancel: oneshot::Canceled) -> Error { errors::internal("on-demand sender prematurely cancelled", "") @@ -108,7 +103,7 @@ impl EthClient { self.sync.with_context(|ctx| self.on_demand.header_by_number(ctx, req) - .map(|(h, _)| Some(h)) + .map(Some) .map_err(err_premature_cancel) .boxed() ) @@ -128,10 +123,9 @@ impl EthClient { _ => None, // latest, earliest, and pending will have all already returned. }; - // todo: cache returned values (header, TD) match maybe_future { Some(recv) => recv, - None => future::err(err_no_context()).boxed() + None => future::err(errors::network_disabled()).boxed() } } @@ -150,7 +144,7 @@ impl EthClient { address: address, }).map(Some)) .map(|x| x.map_err(err_premature_cancel).boxed()) - .unwrap_or_else(|| future::err(err_no_context()).boxed()) + .unwrap_or_else(|| future::err(errors::network_disabled()).boxed()) }).boxed() } } @@ -235,7 +229,7 @@ impl Eth for EthClient { sync.with_context(|ctx| on_demand.block(ctx, request::Body::new(hdr))) .map(|x| x.map(|b| Some(U256::from(b.transactions_count()).into()))) .map(|x| x.map_err(err_premature_cancel).boxed()) - .unwrap_or_else(|| future::err(err_no_context()).boxed()) + .unwrap_or_else(|| future::err(errors::network_disabled()).boxed()) } }).boxed() } @@ -255,7 +249,7 @@ impl Eth for EthClient { sync.with_context(|ctx| on_demand.block(ctx, request::Body::new(hdr))) .map(|x| x.map(|b| Some(U256::from(b.transactions_count()).into()))) .map(|x| x.map_err(err_premature_cancel).boxed()) - .unwrap_or_else(|| future::err(err_no_context()).boxed()) + .unwrap_or_else(|| future::err(errors::network_disabled()).boxed()) } }).boxed() } @@ -275,7 +269,7 @@ impl Eth for EthClient { sync.with_context(|ctx| on_demand.block(ctx, request::Body::new(hdr))) .map(|x| x.map(|b| Some(U256::from(b.uncles_count()).into()))) .map(|x| x.map_err(err_premature_cancel).boxed()) - .unwrap_or_else(|| future::err(err_no_context()).boxed()) + .unwrap_or_else(|| future::err(errors::network_disabled()).boxed()) } }).boxed() } @@ -295,7 +289,7 @@ impl Eth for EthClient { sync.with_context(|ctx| on_demand.block(ctx, request::Body::new(hdr))) .map(|x| x.map(|b| Some(U256::from(b.uncles_count()).into()))) .map(|x| x.map_err(err_premature_cancel).boxed()) - .unwrap_or_else(|| future::err(err_no_context()).boxed()) + .unwrap_or_else(|| future::err(errors::network_disabled()).boxed()) } }).boxed() } diff --git a/rpc/src/v1/impls/light/mod.rs b/rpc/src/v1/impls/light/mod.rs index 1772d5b58..8c2e6d240 100644 --- a/rpc/src/v1/impls/light/mod.rs +++ b/rpc/src/v1/impls/light/mod.rs @@ -15,7 +15,15 @@ // along with Parity. If not, see . //! RPC implementations for the light client. +//! +//! This doesn't re-implement all of the RPC APIs, just those which aren't +//! significantly generic to be reused. pub mod eth; +pub mod parity; +pub mod parity_set; +pub mod trace; pub use self::eth::EthClient; +pub use self::parity::ParityClient; +pub use self::parity_set::ParitySetClient; diff --git a/rpc/src/v1/impls/light/parity.rs b/rpc/src/v1/impls/light/parity.rs new file mode 100644 index 000000000..332088dd6 --- /dev/null +++ b/rpc/src/v1/impls/light/parity.rs @@ -0,0 +1,332 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Parity-specific rpc implementation. +use std::sync::Arc; +use std::collections::{BTreeMap, HashSet}; +use futures::{future, Future, BoxFuture}; + +use util::RotatingLogger; +use util::misc::version_data; + +use crypto::ecies; +use ethkey::{Brain, Generator}; +use ethstore::random_phrase; +use ethsync::LightSyncProvider; +use ethcore::account_provider::AccountProvider; + +use jsonrpc_core::Error; +use jsonrpc_macros::Trailing; +use v1::helpers::{errors, SigningQueue, SignerService, NetworkSettings}; +use v1::helpers::dispatch::{LightDispatcher, DEFAULT_MAC}; +use v1::metadata::Metadata; +use v1::traits::Parity; +use v1::types::{ + Bytes, U256, H160, H256, H512, + Peers, Transaction, RpcSettings, Histogram, + TransactionStats, LocalTransactionStatus, + BlockNumber, ConsensusCapability, VersionInfo, + OperationsInfo, DappId, ChainStatus, + AccountInfo, HwAccountInfo +}; + +/// Parity implementation for light client. +pub struct ParityClient { + light_dispatch: Arc, + accounts: Arc, + logger: Arc, + settings: Arc, + signer: Option>, + dapps_interface: Option, + dapps_port: Option, +} + +impl ParityClient { + /// Creates new `ParityClient`. + pub fn new( + light_dispatch: Arc, + accounts: Arc, + logger: Arc, + settings: Arc, + signer: Option>, + dapps_interface: Option, + dapps_port: Option, + ) -> Self { + ParityClient { + light_dispatch: light_dispatch, + accounts: accounts, + logger: logger, + settings: settings, + signer: signer, + dapps_interface: dapps_interface, + dapps_port: dapps_port, + } + } +} + +impl Parity for ParityClient { + type Metadata = Metadata; + + fn accounts_info(&self, dapp: Trailing) -> Result, Error> { + let dapp = dapp.0; + + let store = &self.accounts; + let dapp_accounts = store + .note_dapp_used(dapp.clone().into()) + .and_then(|_| store.dapp_addresses(dapp.into())) + .map_err(|e| errors::internal("Could not fetch accounts.", e))? + .into_iter().collect::>(); + + let info = store.accounts_info().map_err(|e| errors::account("Could not fetch account info.", e))?; + let other = store.addresses_info(); + + Ok(info + .into_iter() + .chain(other.into_iter()) + .filter(|&(ref a, _)| dapp_accounts.contains(a)) + .map(|(a, v)| (H160::from(a), AccountInfo { name: v.name })) + .collect() + ) + } + + fn hardware_accounts_info(&self) -> Result, Error> { + let store = &self.accounts; + let info = store.hardware_accounts_info().map_err(|e| errors::account("Could not fetch account info.", e))?; + Ok(info + .into_iter() + .map(|(a, v)| (H160::from(a), HwAccountInfo { name: v.name, manufacturer: v.meta })) + .collect() + ) + } + + fn default_account(&self, meta: Self::Metadata) -> BoxFuture { + let dapp_id = meta.dapp_id(); + future::ok(self.accounts + .dapp_addresses(dapp_id.into()) + .ok() + .and_then(|accounts| accounts.get(0).cloned()) + .map(|acc| acc.into()) + .unwrap_or_default() + ).boxed() + } + + fn transactions_limit(&self) -> Result { + Ok(usize::max_value()) + } + + fn min_gas_price(&self) -> Result { + Ok(U256::default()) + } + + fn extra_data(&self) -> Result { + Ok(Bytes::default()) + } + + fn gas_floor_target(&self) -> Result { + Ok(U256::default()) + } + + fn gas_ceil_target(&self) -> Result { + Ok(U256::default()) + } + + fn dev_logs(&self) -> Result, Error> { + let logs = self.logger.logs(); + Ok(logs.as_slice().to_owned()) + } + + fn dev_logs_levels(&self) -> Result { + Ok(self.logger.levels().to_owned()) + } + + fn net_chain(&self) -> Result { + Ok(self.settings.chain.clone()) + } + + fn net_peers(&self) -> Result { + let peers = self.light_dispatch.sync.peers().into_iter().map(Into::into).collect(); + let peer_numbers = self.light_dispatch.sync.peer_numbers(); + + Ok(Peers { + active: peer_numbers.active, + connected: peer_numbers.connected, + max: peer_numbers.max as u32, + peers: peers, + }) + } + + fn net_port(&self) -> Result { + Ok(self.settings.network_port) + } + + fn node_name(&self) -> Result { + Ok(self.settings.name.clone()) + } + + fn registry_address(&self) -> Result, Error> { + Err(errors::light_unimplemented(None)) + } + + fn rpc_settings(&self) -> Result { + Ok(RpcSettings { + enabled: self.settings.rpc_enabled, + interface: self.settings.rpc_interface.clone(), + port: self.settings.rpc_port as u64, + }) + } + + fn default_extra_data(&self) -> Result { + Ok(Bytes::new(version_data())) + } + + fn gas_price_histogram(&self) -> BoxFuture { + self.light_dispatch.gas_price_corpus() + .and_then(|corpus| corpus.histogram(10).ok_or_else(errors::not_enough_data)) + .map(Into::into) + .boxed() + } + + fn unsigned_transactions_count(&self) -> Result { + match self.signer { + None => Err(errors::signer_disabled()), + Some(ref signer) => Ok(signer.len()), + } + } + + fn generate_secret_phrase(&self) -> Result { + Ok(random_phrase(12)) + } + + fn phrase_to_address(&self, phrase: String) -> Result { + Ok(Brain::new(phrase).generate().unwrap().address().into()) + } + + fn list_accounts(&self, _: u64, _: Option, _: Trailing) -> Result>, Error> { + Err(errors::light_unimplemented(None)) + } + + fn list_storage_keys(&self, _: H160, _: u64, _: Option, _: Trailing) -> Result>, Error> { + Err(errors::light_unimplemented(None)) + } + + fn encrypt_message(&self, key: H512, phrase: Bytes) -> Result { + ecies::encrypt(&key.into(), &DEFAULT_MAC, &phrase.0) + .map_err(errors::encryption_error) + .map(Into::into) + } + + fn pending_transactions(&self) -> Result, Error> { + let txq = self.light_dispatch.transaction_queue.read(); + let chain_info = self.light_dispatch.client.chain_info(); + Ok( + txq.ready_transactions(chain_info.best_block_number, chain_info.best_block_timestamp) + .into_iter() + .map(Into::into) + .collect::>() + ) + } + + fn future_transactions(&self) -> Result, Error> { + let txq = self.light_dispatch.transaction_queue.read(); + let chain_info = self.light_dispatch.client.chain_info(); + Ok( + txq.future_transactions(chain_info.best_block_number, chain_info.best_block_timestamp) + .into_iter() + .map(Into::into) + .collect::>() + ) + } + + fn pending_transactions_stats(&self) -> Result, Error> { + let stats = self.light_dispatch.sync.transactions_stats(); + Ok(stats.into_iter() + .map(|(hash, stats)| (hash.into(), stats.into())) + .collect() + ) + } + + fn local_transactions(&self) -> Result, Error> { + let mut map = BTreeMap::new(); + let chain_info = self.light_dispatch.client.chain_info(); + let (best_num, best_tm) = (chain_info.best_block_number, chain_info.best_block_timestamp); + let txq = self.light_dispatch.transaction_queue.read(); + + for pending in txq.ready_transactions(best_num, best_tm) { + map.insert(pending.hash().into(), LocalTransactionStatus::Pending); + } + + for future in txq.future_transactions(best_num, best_tm) { + map.insert(future.hash().into(), LocalTransactionStatus::Future); + } + + // TODO: other types? + + Ok(map) + } + + fn signer_port(&self) -> Result { + self.signer + .clone() + .and_then(|signer| signer.address()) + .map(|address| address.1) + .ok_or_else(|| errors::signer_disabled()) + } + + fn dapps_port(&self) -> Result { + self.dapps_port + .ok_or_else(|| errors::dapps_disabled()) + } + + fn dapps_interface(&self) -> Result { + self.dapps_interface.clone() + .ok_or_else(|| errors::dapps_disabled()) + } + + fn next_nonce(&self, address: H160) -> BoxFuture { + self.light_dispatch.next_nonce(address.into()).map(Into::into).boxed() + } + + fn mode(&self) -> Result { + Err(errors::light_unimplemented(None)) + } + + fn enode(&self) -> Result { + self.light_dispatch.sync.enode().ok_or_else(errors::network_disabled) + } + + fn consensus_capability(&self) -> Result { + Err(errors::light_unimplemented(None)) + } + + fn version_info(&self) -> Result { + Err(errors::light_unimplemented(None)) + } + + fn releases_info(&self) -> Result, Error> { + Err(errors::light_unimplemented(None)) + } + + fn chain_status(&self) -> Result { + let chain_info = self.light_dispatch.client.chain_info(); + + let gap = chain_info.ancient_block_number.map(|x| U256::from(x + 1)) + .and_then(|first| chain_info.first_block_number.map(|last| (first, U256::from(last)))); + + Ok(ChainStatus { + block_gap: gap.map(|(x, y)| (x.into(), y.into())), + }) + } +} diff --git a/rpc/src/v1/impls/light/parity_set.rs b/rpc/src/v1/impls/light/parity_set.rs new file mode 100644 index 000000000..720af0dd9 --- /dev/null +++ b/rpc/src/v1/impls/light/parity_set.rs @@ -0,0 +1,138 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Parity-specific rpc interface for operations altering the settings. +//! Implementation for light client. + +use std::io; +use std::sync::Arc; + +use ethsync::ManageNetwork; +use fetch::Fetch; +use futures::{BoxFuture, Future}; +use util::sha3; + +use jsonrpc_core::Error; +use v1::helpers::errors; +use v1::traits::ParitySet; +use v1::types::{Bytes, H160, H256, U256, ReleaseInfo}; + +/// Parity-specific rpc interface for operations altering the settings. +pub struct ParitySetClient { + net: Arc, + fetch: F, +} + +impl ParitySetClient { + /// Creates new `ParitySetClient` with given `Fetch`. + pub fn new(net: Arc, fetch: F) -> Self { + ParitySetClient { + net: net, + fetch: fetch, + } + } +} + +impl ParitySet for ParitySetClient { + fn set_min_gas_price(&self, _gas_price: U256) -> Result { + Err(errors::light_unimplemented(None)) + } + + fn set_gas_floor_target(&self, _target: U256) -> Result { + Err(errors::light_unimplemented(None)) + } + + fn set_gas_ceil_target(&self, _target: U256) -> Result { + Err(errors::light_unimplemented(None)) + } + + fn set_extra_data(&self, _extra_data: Bytes) -> Result { + Err(errors::light_unimplemented(None)) + } + + fn set_author(&self, _author: H160) -> Result { + Err(errors::light_unimplemented(None)) + } + + fn set_engine_signer(&self, _address: H160, _password: String) -> Result { + Err(errors::light_unimplemented(None)) + } + + fn set_transactions_limit(&self, _limit: usize) -> Result { + Err(errors::light_unimplemented(None)) + } + + fn set_tx_gas_limit(&self, _limit: U256) -> Result { + Err(errors::light_unimplemented(None)) + } + + fn add_reserved_peer(&self, peer: String) -> Result { + match self.net.add_reserved_peer(peer) { + Ok(()) => Ok(true), + Err(e) => Err(errors::invalid_params("Peer address", e)), + } + } + + fn remove_reserved_peer(&self, peer: String) -> Result { + match self.net.remove_reserved_peer(peer) { + Ok(()) => Ok(true), + Err(e) => Err(errors::invalid_params("Peer address", e)), + } + } + + fn drop_non_reserved_peers(&self) -> Result { + self.net.deny_unreserved_peers(); + Ok(true) + } + + fn accept_non_reserved_peers(&self) -> Result { + self.net.accept_unreserved_peers(); + Ok(true) + } + + fn start_network(&self) -> Result { + self.net.start_network(); + Ok(true) + } + + fn stop_network(&self) -> Result { + self.net.stop_network(); + Ok(true) + } + + fn set_mode(&self, _mode: String) -> Result { + Err(errors::light_unimplemented(None)) + } + + fn hash_content(&self, url: String) -> BoxFuture { + self.fetch.process(self.fetch.fetch(&url).then(move |result| { + result + .map_err(errors::from_fetch_error) + .and_then(|response| { + sha3(&mut io::BufReader::new(response)).map_err(errors::from_fetch_error) + }) + .map(Into::into) + })) + } + + fn upgrade_ready(&self) -> Result, Error> { + Err(errors::light_unimplemented(None)) + } + + fn execute_upgrade(&self) -> Result { + Err(errors::light_unimplemented(None)) + } +} diff --git a/rpc/src/v1/impls/light/trace.rs b/rpc/src/v1/impls/light/trace.rs new file mode 100644 index 000000000..5f785ed1b --- /dev/null +++ b/rpc/src/v1/impls/light/trace.rs @@ -0,0 +1,57 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Traces api implementation. + +use jsonrpc_core::Error; +use jsonrpc_macros::Trailing; +use v1::traits::Traces; +use v1::helpers::errors; +use v1::types::{TraceFilter, LocalizedTrace, BlockNumber, Index, CallRequest, Bytes, TraceResults, H256}; + +/// Traces api implementation. +// TODO: all calling APIs should be possible w. proved remote TX execution. +pub struct TracesClient; + +impl Traces for TracesClient { + fn filter(&self, _filter: TraceFilter) -> Result, Error> { + Err(errors::light_unimplemented(None)) + } + + fn block_traces(&self, _block_number: BlockNumber) -> Result, Error> { + Err(errors::light_unimplemented(None)) + } + + fn transaction_traces(&self, _transaction_hash: H256) -> Result, Error> { + Err(errors::light_unimplemented(None)) + } + + fn trace(&self, _transaction_hash: H256, _address: Vec) -> Result, Error> { + Err(errors::light_unimplemented(None)) + } + + fn call(&self, _request: CallRequest, _flags: Vec, _block: Trailing) -> Result, Error> { + Err(errors::light_unimplemented(None)) + } + + fn raw_transaction(&self, _raw_transaction: Bytes, _flags: Vec, _block: Trailing) -> Result, Error> { + Err(errors::light_unimplemented(None)) + } + + fn replay_transaction(&self, _transaction_hash: H256, _flags: Vec) -> Result, Error> { + Err(errors::light_unimplemented(None)) + } +} diff --git a/rpc/src/v1/impls/parity.rs b/rpc/src/v1/impls/parity.rs index 3fcc82c3a..820aa6670 100644 --- a/rpc/src/v1/impls/parity.rs +++ b/rpc/src/v1/impls/parity.rs @@ -232,8 +232,13 @@ impl Parity for ParityClient where Ok(Bytes::new(version_data())) } - fn gas_price_histogram(&self) -> Result { - take_weak!(self.client).gas_price_histogram(100, 10).ok_or_else(errors::not_enough_data).map(Into::into) + fn gas_price_histogram(&self) -> BoxFuture { + future::done(take_weakf!(self.client) + .gas_price_corpus(100) + .histogram(10) + .ok_or_else(errors::not_enough_data) + .map(Into::into) + ).boxed() } fn unsigned_transactions_count(&self) -> Result { @@ -312,16 +317,16 @@ impl Parity for ParityClient where .ok_or_else(|| errors::dapps_disabled()) } - fn next_nonce(&self, address: H160) -> Result { + fn next_nonce(&self, address: H160) -> BoxFuture { let address: Address = address.into(); - let miner = take_weak!(self.miner); - let client = take_weak!(self.client); + let miner = take_weakf!(self.miner); + let client = take_weakf!(self.client); - Ok(miner.last_nonce(&address) + future::ok(miner.last_nonce(&address) .map(|n| n + 1.into()) .unwrap_or_else(|| client.latest_nonce(&address)) .into() - ) + ).boxed() } fn mode(&self) -> Result { diff --git a/rpc/src/v1/traits/parity.rs b/rpc/src/v1/traits/parity.rs index d5ecbd5e6..10e3b54bd 100644 --- a/rpc/src/v1/traits/parity.rs +++ b/rpc/src/v1/traits/parity.rs @@ -101,8 +101,8 @@ build_rpc_trait! { fn default_extra_data(&self) -> Result; /// Returns distribution of gas price in latest blocks. - #[rpc(name = "parity_gasPriceHistogram")] - fn gas_price_histogram(&self) -> Result; + #[rpc(async, name = "parity_gasPriceHistogram")] + fn gas_price_histogram(&self) -> BoxFuture; /// Returns number of unsigned transactions waiting in the signer queue (if signer enabled) /// Returns error when signer is disabled @@ -164,8 +164,8 @@ build_rpc_trait! { fn dapps_interface(&self) -> Result; /// Returns next nonce for particular sender. Should include all transactions in the queue. - #[rpc(name = "parity_nextNonce")] - fn next_nonce(&self, H160) -> Result; + #[rpc(async, name = "parity_nextNonce")] + fn next_nonce(&self, H160) -> BoxFuture; /// Get the mode. Results one of: "active", "passive", "dark", "offline". #[rpc(name = "parity_mode")] diff --git a/rpc/src/v1/types/histogram.rs b/rpc/src/v1/types/histogram.rs index 6cec96469..55d8ae835 100644 --- a/rpc/src/v1/types/histogram.rs +++ b/rpc/src/v1/types/histogram.rs @@ -17,7 +17,6 @@ //! Gas prices histogram. use v1::types::U256; -use util::stats; /// Values of RPC settings. #[derive(Serialize, Deserialize)] @@ -27,11 +26,11 @@ pub struct Histogram { #[serde(rename="bucketBounds")] pub bucket_bounds: Vec, /// Transacion counts for each bucket. - pub counts: Vec, + pub counts: Vec, } -impl From for Histogram { - fn from(h: stats::Histogram) -> Self { +impl From<::stats::Histogram<::util::U256>> for Histogram { + fn from(h: ::stats::Histogram<::util::U256>) -> Self { Histogram { bucket_bounds: h.bucket_bounds.into_iter().map(Into::into).collect(), counts: h.counts diff --git a/sync/src/api.rs b/sync/src/api.rs index 9b1ace73b..4cdc9d37a 100644 --- a/sync/src/api.rs +++ b/sync/src/api.rs @@ -28,7 +28,7 @@ use ethcore::client::{BlockChainClient, ChainNotify}; use ethcore::snapshot::SnapshotService; use ethcore::header::BlockNumber; use sync_io::NetSyncIo; -use chain::{ChainSync, SyncStatus}; +use chain::{ChainSync, SyncStatus as EthSyncStatus}; use std::net::{SocketAddr, AddrParseError}; use ipc::{BinaryConvertable, BinaryConvertError, IpcConfig}; use std::str::FromStr; @@ -82,12 +82,12 @@ impl Default for SyncConfig { } binary_fixed_size!(SyncConfig); -binary_fixed_size!(SyncStatus); +binary_fixed_size!(EthSyncStatus); /// Current sync status pub trait SyncProvider: Send + Sync { /// Get sync status - fn status(&self) -> SyncStatus; + fn status(&self) -> EthSyncStatus; /// Get peers information fn peers(&self) -> Vec; @@ -240,7 +240,7 @@ impl EthSync { #[cfg_attr(feature = "ipc", ipc(client_ident="SyncClient"))] impl SyncProvider for EthSync { /// Get sync status - fn status(&self) -> SyncStatus { + fn status(&self) -> EthSyncStatus { self.eth_handler.sync.write().status() } @@ -620,6 +620,35 @@ pub struct ServiceConfiguration { pub io_path: String, } +/// Numbers of peers (max, min, active). +#[derive(Debug, Clone)] +#[cfg_attr(feature = "ipc", binary)] +pub struct PeerNumbers { + /// Number of connected peers. + pub connected: usize, + /// Number of active peers. + pub active: usize, + /// Max peers. + pub max: usize, + /// Min peers. + pub min: usize, +} + +/// Light synchronization. +pub trait LightSyncProvider { + /// Get peer numbers. + fn peer_numbers(&self) -> PeerNumbers; + + /// Get peers information + fn peers(&self) -> Vec; + + /// Get the enode if available. + fn enode(&self) -> Option; + + /// Returns propagation count for pending transactions. + fn transactions_stats(&self) -> BTreeMap; +} + /// Configuration for the light sync. pub struct LightSyncParams { /// Network configuration. @@ -728,3 +757,46 @@ impl ManageNetwork for LightSync { } } +impl LightSyncProvider for LightSync { + fn peer_numbers(&self) -> PeerNumbers { + let (connected, active) = self.proto.peer_count(); + let config = self.network_config(); + PeerNumbers { + connected: connected, + active: active, + max: config.max_peers as usize, + min: config.min_peers as usize, + } + } + + fn peers(&self) -> Vec { + self.network.with_context_eval(self.subprotocol_name, |ctx| { + let peer_ids = self.network.connected_peers(); + + peer_ids.into_iter().filter_map(|peer_id| { + let session_info = match ctx.session_info(peer_id) { + None => return None, + Some(info) => info, + }; + + Some(PeerInfo { + id: session_info.id.map(|id| id.hex()), + client_version: session_info.client_version, + capabilities: session_info.peer_capabilities.into_iter().map(|c| c.to_string()).collect(), + remote_address: session_info.remote_address, + local_address: session_info.local_address, + eth_info: None, + les_info: self.proto.peer_status(&peer_id).map(Into::into), + }) + }).collect() + }).unwrap_or_else(Vec::new) + } + + fn enode(&self) -> Option { + self.network.external_url() + } + + fn transactions_stats(&self) -> BTreeMap { + Default::default() // TODO + } +} diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 8ea6705f2..6cd4fade5 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -72,11 +72,7 @@ mod api { #[cfg(not(feature = "ipc"))] mod api; -pub use api::{ - EthSync, Params, SyncProvider, ManageNetwork, SyncConfig, - ServiceConfiguration, NetworkConfiguration, PeerInfo, AllowIP, TransactionStats, - LightSync, LightSyncParams, LesProtocolInfo, EthProtocolInfo, -}; +pub use api::*; pub use chain::{SyncStatus, SyncState}; pub use network::{is_valid_node_url, NonReservedPeerMode, NetworkError}; diff --git a/util/src/lib.rs b/util/src/lib.rs index 720b80869..b67154f7b 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -142,7 +142,6 @@ pub mod semantic_version; pub mod log; pub mod path; pub mod snappy; -pub mod stats; pub mod cache; mod timer; diff --git a/util/stats/Cargo.toml b/util/stats/Cargo.toml new file mode 100644 index 000000000..99e81c9e7 --- /dev/null +++ b/util/stats/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "stats" +version = "0.1.0" +authors = ["Parity Technologies "] + +[dependencies] +log = "0.3" diff --git a/util/src/stats.rs b/util/stats/src/lib.rs similarity index 52% rename from util/src/stats.rs rename to util/stats/src/lib.rs index c4c08ddc8..01c988232 100644 --- a/util/src/stats.rs +++ b/util/stats/src/lib.rs @@ -14,22 +14,80 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -//! Statistical functions. +//! Statistical functions and helpers. -use bigint::prelude::*; +use std::iter::FromIterator; +use std::ops::{Add, Sub, Deref, Div}; + +#[macro_use] +extern crate log; + +/// Sorted corpus of data. +#[derive(Debug, Clone, PartialEq)] +pub struct Corpus(Vec); + +impl From> for Corpus { + fn from(mut data: Vec) -> Self { + data.sort(); + Corpus(data) + } +} + +impl FromIterator for Corpus { + fn from_iter>(iterable: I) -> Self { + iterable.into_iter().collect::>().into() + } +} + +impl Deref for Corpus { + type Target = [T]; + + fn deref(&self) -> &[T] { &self.0[..] } +} + +impl Corpus { + /// Get the median element, if it exists. + pub fn median(&self) -> Option<&T> { + self.0.get(self.0.len() / 2) + } + + /// Whether the corpus is empty. + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } + + /// Number of elements in the corpus. + pub fn len(&self) -> usize { + self.0.len() + } +} + +impl Corpus + where T: Add + Sub + Div + From +{ + /// Create a histogram of this corpus if it at least spans the buckets. Bounds are left closed. + /// Excludes outliers. + pub fn histogram(&self, bucket_number: usize) -> Option> { + // TODO: get outliers properly. + let upto = self.len() - self.len() / 40; + Histogram::create(&self.0[..upto], bucket_number) + } +} /// Discretised histogram. #[derive(Debug, PartialEq)] -pub struct Histogram { +pub struct Histogram { /// Bounds of each bucket. - pub bucket_bounds: Vec, + pub bucket_bounds: Vec, /// Count within each bucket. - pub counts: Vec + pub counts: Vec, } -impl Histogram { - /// Histogram of a sorted corpus if it at least spans the buckets. Bounds are left closed. - pub fn new(corpus: &[U256], bucket_number: usize) -> Option { +impl Histogram + where T: Add + Sub + Div + From +{ + // Histogram of a sorted corpus if it at least spans the buckets. Bounds are left closed. + fn create(corpus: &[T], bucket_number: usize) -> Option> { if corpus.len() < 1 { return None; } let corpus_end = corpus.last().expect("there is at least 1 element; qed").clone(); let corpus_start = corpus.first().expect("there is at least 1 element; qed").clone(); @@ -63,42 +121,41 @@ impl Histogram { #[cfg(test)] mod tests { - use bigint::prelude::U256; use super::Histogram; #[test] fn check_histogram() { - let hist = Histogram::new(slice_into![643,689,1408,2000,2296,2512,4250,4320,4842,4958,5804,6065,6098,6354,7002,7145,7845,8589,8593,8895], 5).unwrap(); - let correct_bounds: Vec = vec_into![643, 2294, 3945, 5596, 7247, 8898]; + let hist = Histogram::create(&[643,689,1408,2000,2296,2512,4250,4320,4842,4958,5804,6065,6098,6354,7002,7145,7845,8589,8593,8895], 5).unwrap(); + let correct_bounds: Vec = vec![643, 2294, 3945, 5596, 7247, 8898]; assert_eq!(Histogram { bucket_bounds: correct_bounds, counts: vec![4,2,4,6,4] }, hist); } #[test] fn smaller_data_range_than_bucket_range() { assert_eq!( - Histogram::new(slice_into![1, 2, 2], 3), - Some(Histogram { bucket_bounds: vec_into![1, 2, 3, 4], counts: vec![1, 2, 0] }) + Histogram::create(&[1, 2, 2], 3), + Some(Histogram { bucket_bounds: vec![1, 2, 3, 4], counts: vec![1, 2, 0] }) ); } #[test] fn data_range_is_not_multiple_of_bucket_range() { assert_eq!( - Histogram::new(slice_into![1, 2, 5], 2), - Some(Histogram { bucket_bounds: vec_into![1, 4, 7], counts: vec![2, 1] }) + Histogram::create(&[1, 2, 5], 2), + Some(Histogram { bucket_bounds: vec![1, 4, 7], counts: vec![2, 1] }) ); } #[test] fn data_range_is_multiple_of_bucket_range() { assert_eq!( - Histogram::new(slice_into![1, 2, 6], 2), - Some(Histogram { bucket_bounds: vec_into![1, 4, 7], counts: vec![2, 1] }) + Histogram::create(&[1, 2, 6], 2), + Some(Histogram { bucket_bounds: vec![1, 4, 7], counts: vec![2, 1] }) ); } #[test] fn none_when_too_few_data() { - assert!(Histogram::new(slice_into![], 1).is_none()); + assert!(Histogram::::create(&[], 1).is_none()); } }