diff --git a/.github/ISSUE_TEMPLATE.md b/.github/ISSUE_TEMPLATE.md new file mode 100644 index 000000000..f4b5311d5 --- /dev/null +++ b/.github/ISSUE_TEMPLATE.md @@ -0,0 +1,12 @@ +_Before filing a new issue, please **provide the following information**._ + +> I'm running: +> +> - **Parity version**: 0.0.0 +> - **Operating system**: Windows / MacOS / Linux +> - **And installed**: via installer / homebrew / binaries / from source + +_Your issue description goes here below. Try to include **actual** vs. **expected behavior** and **steps to reproduce** the issue._ + +--- + diff --git a/Cargo.lock b/Cargo.lock index 946507428..e2b4da1eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -723,6 +723,7 @@ dependencies = [ "futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "futures-cpupool 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.10.5 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "native-contracts 0.1.0", "parking_lot 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1580,7 +1581,7 @@ version = "0.1.0" dependencies = [ "byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "ethabi 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", - "ethcore-util 1.8.0", + "ethcore-bigint 0.1.3", "futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "native-contract-generator 0.1.0", ] @@ -1786,7 +1787,7 @@ dependencies = [ [[package]] name = "parity" -version = "1.7.0" +version = "1.8.0" dependencies = [ "ansi_term 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", "app_dirs 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2052,7 +2053,7 @@ dependencies = [ [[package]] name = "parity-ui-precompiled" version = "1.4.0" -source = "git+https://github.com/paritytech/js-precompiled.git#dfb9367a495d5ca3eac3c92a4197cf8652756d37" +source = "git+https://github.com/paritytech/js-precompiled.git#dd9b92d9d8c244678e15163347f9adb2e2560959" dependencies = [ "parity-dapps-glue 1.7.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -2208,10 +2209,10 @@ dependencies = [ name = "price-info" version = "1.7.0" dependencies = [ - "ethcore-util 1.8.0", "fetch 0.1.0", "futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/Cargo.toml b/Cargo.toml index 03d9c9664..3adaf62d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] description = "Parity Ethereum client" name = "parity" -version = "1.7.0" +version = "1.8.0" license = "GPL-3.0" authors = ["Parity Technologies "] build = "build.rs" diff --git a/README.md b/README.md index 37b06f01c..16758bc27 100644 --- a/README.md +++ b/README.md @@ -1,55 +1,41 @@ -# [Parity](https://parity.io/parity.html) -### Fast, light, and robust Ethereum implementation +# [Parity](https://parity.io/parity.html) - fast, light, and robust Ethereum client -### [Download latest release](https://github.com/paritytech/parity/releases) +[![build status](https://gitlab.parity.io/parity/parity/badges/master/build.svg)](https://gitlab.parity.io/parity/parity/commits/master) +[![Snap Status](https://build.snapcraft.io/badge/paritytech/parity.svg)](https://build.snapcraft.io/user/paritytech/parity) +[![GPLv3](https://img.shields.io/badge/license-GPL%20v3-green.svg)](https://www.gnu.org/licenses/gpl-3.0.en.html) -[![build status](https://gitlab.parity.io/parity/parity/badges/master/build.svg)](https://gitlab.parity.io/parity/parity/commits/master) [![Coverage Status][coveralls-image]][coveralls-url] [![GPLv3][license-image]][license-url] [![Snap Status](https://build.snapcraft.io/badge/paritytech/parity.svg)](https://build.snapcraft.io/user/paritytech/parity) +- [Download the latest release here.](https://github.com/paritytech/parity/releases) ### Join the chat! -Parity [![Join the chat at https://gitter.im/ethcore/parity][gitter-image]][gitter-url] and -parity.js [![Join the chat at https://gitter.im/ethcore/parity.js](https://badges.gitter.im/ethcore/parity.js.svg)](https://gitter.im/ethcore/parity.js?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) - -[Internal Documentation][doc-url] - - -Be sure to check out [our wiki][wiki-url] for more information. - -[coveralls-image]: https://coveralls.io/repos/github/paritytech/parity/badge.svg?branch=master -[coveralls-url]: https://coveralls.io/github/paritytech/parity?branch=master -[gitter-image]: https://badges.gitter.im/Join%20Chat.svg -[gitter-url]: https://gitter.im/ethcore/parity?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge -[license-image]: https://img.shields.io/badge/license-GPL%20v3-green.svg -[license-url]: https://www.gnu.org/licenses/gpl-3.0.en.html -[doc-url]: https://paritytech.github.io/parity/ethcore/index.html -[wiki-url]: https://github.com/paritytech/parity/wiki +Get in touch with us on Gitter: +[![Gitter: Parity](https://img.shields.io/badge/gitter-parity-4AB495.svg)](https://gitter.im/paritytech/parity) +[![Gitter: Parity.js](https://img.shields.io/badge/gitter-parity.js-4AB495.svg)](https://gitter.im/paritytech/parity.js) +[![Gitter: Parity/Miners](https://img.shields.io/badge/gitter-parity/miners-4AB495.svg)](https://gitter.im/paritytech/parity/miners) +[![Gitter: Parity-PoA](https://img.shields.io/badge/gitter-parity--poa-4AB495.svg)](https://gitter.im/paritytech/parity-poa) +Be sure to check out [our wiki](https://github.com/paritytech/parity/wiki) and the [internal documentation](https://paritytech.github.io/parity/ethcore/index.html) for more information. ---- - ## About Parity -Parity's goal is to be the fastest, lightest, and most secure Ethereum client. We are developing Parity using the sophisticated and -cutting-edge Rust programming language. Parity is licensed under the GPLv3, and can be used for all your Ethereum needs. +Parity's goal is to be the fastest, lightest, and most secure Ethereum client. We are developing Parity using the sophisticated and cutting-edge Rust programming language. Parity is licensed under the GPLv3, and can be used for all your Ethereum needs. + +Parity comes with a built-in wallet. To access [Parity Wallet](http://web3.site/) simply go to http://web3.site/ (if you don't have access to the internet, but still want to use the service, you can also use http://127.0.0.1:8180/). It includes various functionality allowing you to: -Parity comes with a built-in wallet. To access [Parity Wallet](http://web3.site/) simply go to http://web3.site/ (if you don't have access to the internet, but still want to use the service, you can also use http://127.0.0.1:8180/). It -includes various functionality allowing you to: - create and manage your Ethereum accounts; - manage your Ether and any Ethereum tokens; - create and register your own tokens; - and much more. -By default, Parity will also run a JSONRPC server on `127.0.0.1:8545`. This is fully configurable and supports a number -of RPC APIs. +By default, Parity will also run a JSONRPC server on `127.0.0.1:8545`. This is fully configurable and supports a number of RPC APIs. -If you run into an issue while using parity, feel free to file one in this repository -or hop on our [gitter chat room][gitter-url] to ask a question. We are glad to help! +If you run into an issue while using parity, feel free to file one in this repository or hop on our [gitter chat room](https://gitter.im/paritytech/parity) to ask a question. We are glad to help! **For security-critical issues**, please refer to the security policy outlined in `SECURITY.MD`. -Parity's current release is 1.6. You can download it at https://github.com/paritytech/parity/releases or follow the instructions -below to build from source. +Parity's current release is 1.7. You can download it at https://github.com/paritytech/parity/releases or follow the instructions below to build from source. ---- diff --git a/dapps/src/api/api.rs b/dapps/src/api/api.rs index 3f1c50de8..7bd7fa049 100644 --- a/dapps/src/api/api.rs +++ b/dapps/src/api/api.rs @@ -21,7 +21,7 @@ use hyper::method::Method; use hyper::status::StatusCode; use api::{response, types}; -use api::time::TimeChecker; +use api::time::{TimeChecker, MAX_DRIFT}; use apps::fetcher::Fetcher; use handlers::{self, extract_url}; use endpoint::{Endpoint, Handler, EndpointPath}; @@ -122,7 +122,6 @@ impl RestApiRouter { // Check time let time = { - const MAX_DRIFT: i64 = 500; let (status, message, details) = match time { Ok(Ok(diff)) if diff < MAX_DRIFT && diff > -MAX_DRIFT => { (HealthStatus::Ok, "".into(), diff) diff --git a/dapps/src/api/time.rs b/dapps/src/api/time.rs index 3117f4cc9..06b9cee7f 100644 --- a/dapps/src/api/time.rs +++ b/dapps/src/api/time.rs @@ -33,11 +33,13 @@ use std::io; use std::{fmt, mem, time}; -use std::sync::Arc; use std::collections::VecDeque; +use std::sync::atomic::{self, AtomicUsize}; +use std::sync::Arc; use futures::{self, Future, BoxFuture}; -use futures_cpupool::CpuPool; +use futures::future::{self, IntoFuture}; +use futures_cpupool::{CpuPool, CpuFuture}; use ntp; use time::{Duration, Timespec}; use util::RwLock; @@ -45,6 +47,8 @@ use util::RwLock; /// Time checker error. #[derive(Debug, Clone, PartialEq)] pub enum Error { + /// No servers are currently available for a query. + NoServersAvailable, /// There was an error when trying to reach the NTP server. Ntp(String), /// IO error when reading NTP response. @@ -56,6 +60,7 @@ impl fmt::Display for Error { use self::Error::*; match *self { + NoServersAvailable => write!(fmt, "No NTP servers available"), Ntp(ref err) => write!(fmt, "NTP error: {}", err), Io(ref err) => write!(fmt, "Connection Error: {}", err), } @@ -72,58 +77,123 @@ impl From for Error { /// NTP time drift checker. pub trait Ntp { + /// Returned Future. + type Future: IntoFuture; + /// Returns the current time drift. - fn drift(&self) -> BoxFuture; + fn drift(&self) -> Self::Future; +} + +const SERVER_MAX_POLL_INTERVAL_SECS: u64 = 60; +#[derive(Debug)] +struct Server { + pub address: String, + next_call: RwLock, + failures: AtomicUsize, +} + +impl Server { + pub fn is_available(&self) -> bool { + *self.next_call.read() < time::Instant::now() + } + + pub fn report_success(&self) { + self.failures.store(0, atomic::Ordering::SeqCst); + self.update_next_call(1) + } + + pub fn report_failure(&self) { + let errors = self.failures.fetch_add(1, atomic::Ordering::SeqCst); + self.update_next_call(1 << errors) + } + + fn update_next_call(&self, delay: usize) { + *self.next_call.write() = time::Instant::now() + time::Duration::from_secs(delay as u64 * SERVER_MAX_POLL_INTERVAL_SECS); + } +} + +impl> From for Server { + fn from(t: T) -> Self { + Server { + address: t.as_ref().to_owned(), + next_call: RwLock::new(time::Instant::now()), + failures: Default::default(), + } + } } /// NTP client using the SNTP algorithm for calculating drift. #[derive(Clone)] pub struct SimpleNtp { - address: Arc, + addresses: Vec>, pool: CpuPool, } impl fmt::Debug for SimpleNtp { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "Ntp {{ address: {} }}", self.address) + f + .debug_struct("SimpleNtp") + .field("addresses", &self.addresses) + .finish() } } impl SimpleNtp { - fn new(address: &str, pool: CpuPool) -> SimpleNtp { + fn new>(addresses: &[T], pool: CpuPool) -> SimpleNtp { SimpleNtp { - address: Arc::new(address.to_owned()), + addresses: addresses.iter().map(Server::from).map(Arc::new).collect(), pool: pool, } } } impl Ntp for SimpleNtp { - fn drift(&self) -> BoxFuture { - let address = self.address.clone(); - if &*address == "none" { - return futures::future::err(Error::Ntp("NTP server is not provided.".into())).boxed(); - } + type Future = future::Either< + CpuFuture, + future::FutureResult, + >; - self.pool.spawn_fn(move || { - let packet = ntp::request(&*address)?; - let dest_time = ::time::now_utc().to_timespec(); - let orig_time = Timespec::from(packet.orig_time); - let recv_time = Timespec::from(packet.recv_time); - let transmit_time = Timespec::from(packet.transmit_time); + fn drift(&self) -> Self::Future { + use self::future::Either::{A, B}; - let drift = ((recv_time - orig_time) + (transmit_time - dest_time)) / 2; + let server = self.addresses.iter().find(|server| server.is_available()); + server.map(|server| { + let server = server.clone(); + A(self.pool.spawn_fn(move || { + debug!(target: "dapps", "Fetching time from {}.", server.address); - Ok(drift) - }).boxed() + match ntp::request(&server.address) { + Ok(packet) => { + let dest_time = ::time::now_utc().to_timespec(); + let orig_time = Timespec::from(packet.orig_time); + let recv_time = Timespec::from(packet.recv_time); + let transmit_time = Timespec::from(packet.transmit_time); + + let drift = ((recv_time - orig_time) + (transmit_time - dest_time)) / 2; + + server.report_success(); + Ok(drift) + }, + Err(err) => { + server.report_failure(); + Err(err.into()) + }, + } + })) + }).unwrap_or_else(|| B(future::err(Error::NoServersAvailable))) } } // NOTE In a positive scenario first results will be seen after: -// MAX_RESULTS * UPDATE_TIMEOUT_OK_SECS seconds. -const MAX_RESULTS: usize = 7; -const UPDATE_TIMEOUT_OK_SECS: u64 = 30; -const UPDATE_TIMEOUT_ERR_SECS: u64 = 2; +// MAX_RESULTS * UPDATE_TIMEOUT_INCOMPLETE_SECS seconds. +const MAX_RESULTS: usize = 4; +const UPDATE_TIMEOUT_OK_SECS: u64 = 6 * 60 * 60; +const UPDATE_TIMEOUT_WARN_SECS: u64 = 15 * 60; +const UPDATE_TIMEOUT_ERR_SECS: u64 = 60; +const UPDATE_TIMEOUT_INCOMPLETE_SECS: u64 = 10; + +/// Maximal valid time drift. +pub const MAX_DRIFT: i64 = 500; #[derive(Debug, Clone)] /// A time checker. @@ -134,13 +204,13 @@ pub struct TimeChecker { impl TimeChecker { /// Creates new time checker given the NTP server address. - pub fn new(ntp_address: String, pool: CpuPool) -> Self { + pub fn new>(ntp_addresses: &[T], pool: CpuPool) -> Self { let last_result = Arc::new(RwLock::new( // Assume everything is ok at the very beginning. (time::Instant::now(), vec![Ok(0)].into()) )); - let ntp = SimpleNtp::new(&ntp_address, pool); + let ntp = SimpleNtp::new(ntp_addresses, pool); TimeChecker { ntp, @@ -149,22 +219,34 @@ impl TimeChecker { } } -impl TimeChecker { +impl TimeChecker where ::Future: Send + 'static { /// Updates the time pub fn update(&self) -> BoxFuture { + trace!(target: "dapps", "Updating time from NTP."); let last_result = self.last_result.clone(); - self.ntp.drift().then(move |res| { + self.ntp.drift().into_future().then(move |res| { + let res = res.map(|d| d.num_milliseconds()); + + if let Err(Error::NoServersAvailable) = res { + debug!(target: "dapps", "No NTP servers available. Selecting an older result."); + return select_result(last_result.read().1.iter()); + } + + // Update the results. let mut results = mem::replace(&mut last_result.write().1, VecDeque::new()); + let has_all_results = results.len() >= MAX_RESULTS; let valid_till = time::Instant::now() + time::Duration::from_secs( - if res.is_ok() && results.len() == MAX_RESULTS { - UPDATE_TIMEOUT_OK_SECS - } else { - UPDATE_TIMEOUT_ERR_SECS + match res { + Ok(time) if has_all_results && time < MAX_DRIFT => UPDATE_TIMEOUT_OK_SECS, + Ok(_) if has_all_results => UPDATE_TIMEOUT_WARN_SECS, + Err(_) if has_all_results => UPDATE_TIMEOUT_ERR_SECS, + _ => UPDATE_TIMEOUT_INCOMPLETE_SECS, } ); + trace!(target: "dapps", "New time drift received: {:?}", res); // Push the result. - results.push_back(res.map(|d| d.num_milliseconds())); + results.push_back(res); while results.len() > MAX_RESULTS { results.pop_front(); } @@ -209,7 +291,7 @@ mod tests { use std::cell::{Cell, RefCell}; use std::time::Instant; use time::Duration; - use futures::{self, BoxFuture, Future}; + use futures::{future, Future}; use super::{Ntp, TimeChecker, Error}; use util::RwLock; @@ -224,9 +306,11 @@ mod tests { } impl Ntp for FakeNtp { - fn drift(&self) -> BoxFuture { + type Future = future::FutureResult; + + fn drift(&self) -> Self::Future { self.1.set(self.1.get() + 1); - futures::future::ok(self.0.borrow_mut().pop().expect("Unexpected call to drift().")).boxed() + future::ok(self.0.borrow_mut().pop().expect("Unexpected call to drift().")) } } diff --git a/dapps/src/lib.rs b/dapps/src/lib.rs index 0cb7024cc..f34c24cae 100644 --- a/dapps/src/lib.rs +++ b/dapps/src/lib.rs @@ -130,7 +130,7 @@ impl Middleware { /// Creates new middleware for UI server. pub fn ui( - ntp_server: &str, + ntp_servers: &[String], pool: CpuPool, remote: Remote, dapps_domain: &str, @@ -146,7 +146,7 @@ impl Middleware { ).embeddable_on(None).allow_dapps(false)); let special = { let mut special = special_endpoints( - ntp_server, + ntp_servers, pool, content_fetcher.clone(), remote.clone(), @@ -171,7 +171,7 @@ impl Middleware { /// Creates new Dapps server middleware. pub fn dapps( - ntp_server: &str, + ntp_servers: &[String], pool: CpuPool, remote: Remote, ui_address: Option<(String, u16)>, @@ -203,7 +203,7 @@ impl Middleware { let special = { let mut special = special_endpoints( - ntp_server, + ntp_servers, pool, content_fetcher.clone(), remote.clone(), @@ -237,8 +237,8 @@ impl http::RequestMiddleware for Middleware { } } -fn special_endpoints( - ntp_server: &str, +fn special_endpoints>( + ntp_servers: &[T], pool: CpuPool, content_fetcher: Arc, remote: Remote, @@ -250,7 +250,7 @@ fn special_endpoints( special.insert(router::SpecialEndpoint::Api, Some(api::RestApi::new( content_fetcher, sync_status, - api::TimeChecker::new(ntp_server.into(), pool), + api::TimeChecker::new(ntp_servers, pool), remote, ))); special diff --git a/dapps/src/tests/helpers/mod.rs b/dapps/src/tests/helpers/mod.rs index 2d9d5f341..38dd82de6 100644 --- a/dapps/src/tests/helpers/mod.rs +++ b/dapps/src/tests/helpers/mod.rs @@ -255,7 +255,7 @@ impl Server { fetch: F, ) -> Result { let middleware = Middleware::dapps( - "pool.ntp.org:123", + &["0.pool.ntp.org:123".into(), "1.pool.ntp.org:123".into()], CpuPool::new(4), remote, signer_address, diff --git a/ethcore/native_contracts/Cargo.toml b/ethcore/native_contracts/Cargo.toml index 5dc18c8f5..2f91a4848 100644 --- a/ethcore/native_contracts/Cargo.toml +++ b/ethcore/native_contracts/Cargo.toml @@ -9,7 +9,7 @@ build = "build.rs" ethabi = "2.0" futures = "0.1" byteorder = "1.0" -ethcore-util = { path = "../../util" } +ethcore-bigint = { path = "../../util/bigint" } [build-dependencies] native-contract-generator = { path = "generator" } diff --git a/ethcore/native_contracts/build.rs b/ethcore/native_contracts/build.rs index cec830929..bcb64067c 100644 --- a/ethcore/native_contracts/build.rs +++ b/ethcore/native_contracts/build.rs @@ -21,6 +21,7 @@ use std::fs::File; use std::io::Write; // TODO: just walk the "res" directory and generate whole crate automatically. +const KEY_SERVER_SET_ABI: &'static str = include_str!("res/key_server_set.json"); const REGISTRY_ABI: &'static str = include_str!("res/registrar.json"); const URLHINT_ABI: &'static str = include_str!("res/urlhint.json"); const SERVICE_TRANSACTION_ABI: &'static str = include_str!("res/service_transaction.json"); @@ -45,6 +46,7 @@ fn build_test_contracts() { } fn main() { + build_file("KeyServerSet", KEY_SERVER_SET_ABI, "key_server_set.rs"); build_file("Registry", REGISTRY_ABI, "registry.rs"); build_file("Urlhint", URLHINT_ABI, "urlhint.rs"); build_file("ServiceTransactionChecker", SERVICE_TRANSACTION_ABI, "service_transaction.rs"); diff --git a/ethcore/native_contracts/generator/src/lib.rs b/ethcore/native_contracts/generator/src/lib.rs index 793ad6085..996ee4969 100644 --- a/ethcore/native_contracts/generator/src/lib.rs +++ b/ethcore/native_contracts/generator/src/lib.rs @@ -15,7 +15,7 @@ // along with Parity. If not, see . //! Rust code contract generator. -//! The code generated will require a dependence on the `ethcore-util`, +//! The code generated will require a dependence on the `ethcore-bigint::prelude`, //! `ethabi`, `byteorder`, and `futures` crates. //! This currently isn't hygienic, so compilation of generated code may fail //! due to missing crates or name collisions. This will change when @@ -48,14 +48,14 @@ pub fn generate_module(struct_name: &str, abi: &str) -> Result { use byteorder::{{BigEndian, ByteOrder}}; use futures::{{future, Future, IntoFuture, BoxFuture}}; use ethabi::{{Contract, Interface, Token, Event}}; -use util; +use bigint; /// Generated Rust bindings to an Ethereum contract. #[derive(Clone, Debug)] pub struct {name} {{ contract: Contract, /// Address to make calls to. - pub address: util::Address, + pub address: bigint::prelude::H160, }} const ABI: &'static str = r#"{abi_str}"#; @@ -63,7 +63,7 @@ const ABI: &'static str = r#"{abi_str}"#; impl {name} {{ /// Create a new instance of `{name}` with an address. /// Calls can be made, given a callback for dispatching calls asynchronously. - pub fn new(address: util::Address) -> Self {{ + pub fn new(address: bigint::prelude::H160) -> Self {{ let contract = Contract::new(Interface::load(ABI.as_bytes()) .expect("ABI checked at generation-time; qed")); {name} {{ @@ -108,7 +108,7 @@ fn generate_functions(contract: &Contract) -> Result { /// Outputs: {abi_outputs:?} pub fn {snake_name}(&self, call: F, {params}) -> BoxFuture<{output_type}, String> where - F: FnOnce(util::Address, Vec) -> U, + F: FnOnce(bigint::prelude::H160, Vec) -> U, U: IntoFuture, Error=String>, U::Future: Send + 'static {{ @@ -217,8 +217,8 @@ fn output_params_codegen(outputs: &[ParamType]) -> Result<(String, String), Para // create code for an argument type from param type. fn rust_type(input: ParamType) -> Result { Ok(match input { - ParamType::Address => "util::Address".into(), - ParamType::FixedBytes(len) if len <= 32 => format!("util::H{}", len * 8), + ParamType::Address => "bigint::prelude::H160".into(), + ParamType::FixedBytes(len) if len <= 32 => format!("bigint::prelude::H{}", len * 8), ParamType::Bytes | ParamType::FixedBytes(_) => "Vec".into(), ParamType::Int(width) => match width { 8 | 16 | 32 | 64 => format!("i{}", width), @@ -226,7 +226,7 @@ fn rust_type(input: ParamType) -> Result { }, ParamType::Uint(width) => match width { 8 | 16 | 32 | 64 => format!("u{}", width), - 128 | 160 | 256 => format!("util::U{}", width), + 128 | 160 | 256 => format!("bigint::prelude::U{}", width), _ => return Err(ParamType::Uint(width)), }, ParamType::Bool => "bool".into(), @@ -259,8 +259,8 @@ fn tokenize(name: &str, input: ParamType) -> (bool, String) { }, ParamType::Uint(width) => format!( "let mut r = [0; 32]; {}.to_big_endian(&mut r); Token::Uint(r)", - if width <= 64 { format!("util::U256::from({} as u64)", name) } - else { format!("util::U256::from({})", name) } + if width <= 64 { format!("bigint::prelude::U256::from({} as u64)", name) } + else { format!("bigint::prelude::U256::from({})", name) } ), ParamType::Bool => format!("Token::Bool({})", name), ParamType::String => format!("Token::String({})", name), @@ -281,11 +281,11 @@ fn tokenize(name: &str, input: ParamType) -> (bool, String) { // panics on unsupported types. fn detokenize(name: &str, output_type: ParamType) -> String { match output_type { - ParamType::Address => format!("{}.to_address().map(util::H160)", name), + ParamType::Address => format!("{}.to_address().map(bigint::prelude::H160)", name), ParamType::Bytes => format!("{}.to_bytes()", name), ParamType::FixedBytes(len) if len <= 32 => { // ensure no panic on slice too small. - let read_hash = format!("b.resize({}, 0); util::H{}::from_slice(&b[..{}])", + let read_hash = format!("b.resize({}, 0); bigint::prelude::H{}::from_slice(&b[..{}])", len, len * 8, len); format!("{}.to_fixed_bytes().map(|mut b| {{ {} }})", @@ -302,8 +302,8 @@ fn detokenize(name: &str, output_type: ParamType) -> String { } ParamType::Uint(width) => { let read_uint = match width { - 8 | 16 | 32 | 64 => format!("util::U256(u).low_u64() as u{}", width), - _ => format!("util::U{}::from(&u[..])", width), + 8 | 16 | 32 | 64 => format!("bigint::prelude::U256(u).low_u64() as u{}", width), + _ => format!("bigint::prelude::U{}::from(&u[..])", width), }; format!("{}.to_uint().map(|u| {})", name, read_uint) @@ -328,30 +328,30 @@ mod tests { #[test] fn input_types() { assert_eq!(::input_params_codegen(&[]).unwrap().0, ""); - assert_eq!(::input_params_codegen(&[ParamType::Address]).unwrap().0, "param_0: util::Address, "); + assert_eq!(::input_params_codegen(&[ParamType::Address]).unwrap().0, "param_0: bigint::prelude::H160, "); assert_eq!(::input_params_codegen(&[ParamType::Address, ParamType::Bytes]).unwrap().0, - "param_0: util::Address, param_1: Vec, "); + "param_0: bigint::prelude::H160, param_1: Vec, "); } #[test] fn output_types() { assert_eq!(::output_params_codegen(&[]).unwrap().0, "()"); - assert_eq!(::output_params_codegen(&[ParamType::Address]).unwrap().0, "(util::Address)"); + assert_eq!(::output_params_codegen(&[ParamType::Address]).unwrap().0, "(bigint::prelude::H160)"); assert_eq!(::output_params_codegen(&[ParamType::Address, ParamType::Array(Box::new(ParamType::Bytes))]).unwrap().0, - "(util::Address, Vec>)"); + "(bigint::prelude::H160, Vec>)"); } #[test] fn rust_type() { - assert_eq!(::rust_type(ParamType::FixedBytes(32)).unwrap(), "util::H256"); + assert_eq!(::rust_type(ParamType::FixedBytes(32)).unwrap(), "bigint::prelude::H256"); assert_eq!(::rust_type(ParamType::Array(Box::new(ParamType::FixedBytes(32)))).unwrap(), - "Vec"); + "Vec"); assert_eq!(::rust_type(ParamType::Uint(64)).unwrap(), "u64"); assert!(::rust_type(ParamType::Uint(63)).is_err()); assert_eq!(::rust_type(ParamType::Int(32)).unwrap(), "i32"); - assert_eq!(::rust_type(ParamType::Uint(256)).unwrap(), "util::U256"); + assert_eq!(::rust_type(ParamType::Uint(256)).unwrap(), "bigint::prelude::U256"); } // codegen tests will need bootstrapping of some kind. diff --git a/ethcore/native_contracts/res/key_server_set.json b/ethcore/native_contracts/res/key_server_set.json new file mode 100644 index 000000000..93f68837a --- /dev/null +++ b/ethcore/native_contracts/res/key_server_set.json @@ -0,0 +1 @@ +[{"constant":true,"inputs":[{"name":"","type":"uint256"}],"name":"keyServersList","outputs":[{"name":"","type":"address"}],"payable":false,"type":"function"},{"constant":false,"inputs":[{"name":"_new","type":"address"}],"name":"setOwner","outputs":[],"payable":false,"type":"function"},{"constant":true,"inputs":[{"name":"keyServer","type":"address"}],"name":"getKeyServerPublic","outputs":[{"name":"","type":"bytes"}],"payable":false,"type":"function"},{"constant":true,"inputs":[],"name":"getKeyServers","outputs":[{"name":"","type":"address[]"}],"payable":false,"type":"function"},{"constant":true,"inputs":[],"name":"owner","outputs":[{"name":"","type":"address"}],"payable":false,"type":"function"},{"constant":true,"inputs":[{"name":"keyServer","type":"address"}],"name":"getKeyServerAddress","outputs":[{"name":"","type":"string"}],"payable":false,"type":"function"},{"constant":false,"inputs":[{"name":"keyServer","type":"address"}],"name":"removeKeyServer","outputs":[],"payable":false,"type":"function"},{"constant":false,"inputs":[{"name":"keyServerPublic","type":"bytes"},{"name":"keyServerIp","type":"string"}],"name":"addKeyServer","outputs":[],"payable":false,"type":"function"},{"anonymous":false,"inputs":[{"indexed":false,"name":"keyServer","type":"address"}],"name":"KeyServerAdded","type":"event"},{"anonymous":false,"inputs":[{"indexed":false,"name":"keyServer","type":"address"}],"name":"KeyServerRemoved","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"old","type":"address"},{"indexed":true,"name":"current","type":"address"}],"name":"NewOwner","type":"event"}] \ No newline at end of file diff --git a/ethcore/native_contracts/src/key_server_set.rs b/ethcore/native_contracts/src/key_server_set.rs new file mode 100644 index 000000000..60b137aae --- /dev/null +++ b/ethcore/native_contracts/src/key_server_set.rs @@ -0,0 +1,21 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +#![allow(unused_mut, unused_variables, unused_imports)] + +//! Secret store Key Server set contract. + +include!(concat!(env!("OUT_DIR"), "/key_server_set.rs")); diff --git a/ethcore/native_contracts/src/lib.rs b/ethcore/native_contracts/src/lib.rs index e35a4ec19..58875f8a2 100644 --- a/ethcore/native_contracts/src/lib.rs +++ b/ethcore/native_contracts/src/lib.rs @@ -21,8 +21,9 @@ extern crate futures; extern crate byteorder; extern crate ethabi; -extern crate ethcore_util as util; +extern crate ethcore_bigint as bigint; +mod key_server_set; mod registry; mod urlhint; mod service_transaction; @@ -32,6 +33,7 @@ mod validator_report; pub mod test_contracts; +pub use self::key_server_set::KeyServerSet; pub use self::registry::Registry; pub use self::urlhint::Urlhint; pub use self::service_transaction::ServiceTransactionChecker; diff --git a/ethcore/src/engines/authority_round/mod.rs b/ethcore/src/engines/authority_round/mod.rs index 18389f168..e7284bfbe 100644 --- a/ethcore/src/engines/authority_round/mod.rs +++ b/ethcore/src/engines/authority_round/mod.rs @@ -690,6 +690,7 @@ impl Engine for AuthorityRound { // apply immediate transitions. if let Some(change) = self.validators.is_epoch_end(first, chain_head) { + let change = combine_proofs(chain_head.number(), &change, &[]); return Some(change) } diff --git a/ethcore/src/engines/tendermint/mod.rs b/ethcore/src/engines/tendermint/mod.rs index 1c962d633..cc75e99c3 100644 --- a/ethcore/src/engines/tendermint/mod.rs +++ b/ethcore/src/engines/tendermint/mod.rs @@ -643,6 +643,7 @@ impl Engine for Tendermint { let first = chain_head.number() == 0; if let Some(change) = self.validators.is_epoch_end(first, chain_head) { + let change = combine_proofs(chain_head.number(), &change, &[]); return Some(change) } else if let Some(pending) = transition_store(chain_head.hash()) { let signal_number = chain_head.number(); diff --git a/js/package-lock.json b/js/package-lock.json index 081828721..7054165c9 100644 --- a/js/package-lock.json +++ b/js/package-lock.json @@ -1,6 +1,6 @@ { "name": "parity.js", - "version": "1.8.10", + "version": "1.8.11", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/js/package.json b/js/package.json index a786df07d..828fb672d 100644 --- a/js/package.json +++ b/js/package.json @@ -1,6 +1,6 @@ { "name": "parity.js", - "version": "1.8.10", + "version": "1.8.11", "main": "release/index.js", "jsnext:main": "src/index.js", "author": "Parity Team ", diff --git a/mac/Parity.pkgproj b/mac/Parity.pkgproj index cc7810dba..1c87af629 100755 --- a/mac/Parity.pkgproj +++ b/mac/Parity.pkgproj @@ -462,7 +462,7 @@ OVERWRITE_PERMISSIONS VERSION - 1.7.0 + 1.8.0 UUID 2DCD5B81-7BAF-4DA1-9251-6274B089FD36 diff --git a/mac/Parity/Info.plist b/mac/Parity/Info.plist index 017939aec..dba951778 100644 --- a/mac/Parity/Info.plist +++ b/mac/Parity/Info.plist @@ -17,7 +17,7 @@ CFBundlePackageType APPL CFBundleShortVersionString - 1.6 + 1.8 CFBundleVersion 1 LSApplicationCategoryType diff --git a/nsis/installer.nsi b/nsis/installer.nsi index 7173beccc..5b7940302 100644 --- a/nsis/installer.nsi +++ b/nsis/installer.nsi @@ -9,7 +9,7 @@ !define COMPANYNAME "Parity" !define DESCRIPTION "Fast, light, robust Ethereum implementation" !define VERSIONMAJOR 1 -!define VERSIONMINOR 7 +!define VERSIONMINOR 8 !define VERSIONBUILD 0 !define ARGS "--warp" !define FIRST_START_ARGS "ui --warp --mode=passive" diff --git a/parity/cli/config.toml b/parity/cli/config.toml index 4af4ca076..08da653de 100644 --- a/parity/cli/config.toml +++ b/parity/cli/config.toml @@ -78,7 +78,7 @@ disable_periodic = true jit = false [misc] -ntp_server = "pool.ntp.org:123" +ntp_servers = ["0.parity.pool.ntp.org:123"] logging = "own_tx=trace" log_file = "/var/log/parity.log" color = true diff --git a/parity/cli/mod.rs b/parity/cli/mod.rs index c10069b02..b978918ce 100644 --- a/parity/cli/mod.rs +++ b/parity/cli/mod.rs @@ -359,8 +359,8 @@ usage! { or |c: &Config| otry!(c.vm).jit.clone(), // -- Miscellaneous Options - flag_ntp_server: String = "none", - or |c: &Config| otry!(c.misc).ntp_server.clone(), + flag_ntp_servers: String = "0.parity.pool.ntp.org:123,1.parity.pool.ntp.org:123,2.parity.pool.ntp.org:123,3.parity.pool.ntp.org:123", + or |c: &Config| otry!(c.misc).ntp_servers.clone().map(|vec| vec.join(",")), flag_logging: Option = None, or |c: &Config| otry!(c.misc).logging.clone().map(Some), flag_log_file: Option = None, @@ -606,7 +606,7 @@ struct VM { #[derive(Default, Debug, PartialEq, Deserialize)] struct Misc { - ntp_server: Option, + ntp_servers: Option>, logging: Option, log_file: Option, color: Option, @@ -919,7 +919,7 @@ mod tests { flag_dapps_apis_all: None, // -- Miscellaneous Options - flag_ntp_server: "none".into(), + flag_ntp_servers: "0.parity.pool.ntp.org:123,1.parity.pool.ntp.org:123,2.parity.pool.ntp.org:123,3.parity.pool.ntp.org:123".into(), flag_version: false, flag_logging: Some("own_tx=trace".into()), flag_log_file: Some("/var/log/parity.log".into()), @@ -1098,7 +1098,7 @@ mod tests { jit: Some(false), }), misc: Some(Misc { - ntp_server: Some("pool.ntp.org:123".into()), + ntp_servers: Some(vec!["0.parity.pool.ntp.org:123".into()]), logging: Some("own_tx=trace".into()), log_file: Some("/var/log/parity.log".into()), color: Some(true), diff --git a/parity/cli/usage.txt b/parity/cli/usage.txt index c1d1ab9de..cb090dced 100644 --- a/parity/cli/usage.txt +++ b/parity/cli/usage.txt @@ -78,7 +78,7 @@ Operating Options: Convenience Options: -c --config CONFIG Specify a configuration. CONFIG may be either a - configuration file or a preset: dev, insecure, dev-insecure, + configuration file or a preset: dev, insecure, dev-insecure, mining, or non-standard-ports. (default: {flag_config}). --ports-shift SHIFT Add SHIFT to all port numbers Parity is listening on. @@ -483,8 +483,10 @@ Internal Options: --can-restart Executable will auto-restart if exiting with 69. Miscellaneous Options: - --ntp-server HOST NTP server to provide current time (host:port). Used to verify node health. - (default: {flag_ntp_server}) + --ntp-servers HOSTS Comma separated list of NTP servers to provide current time (host:port). + Used to verify node health. Parity uses pool.ntp.org NTP servers, + consider joining the pool: http://www.pool.ntp.org/join.html + (default: {flag_ntp_servers}) -l --logging LOGGING Specify the logging level. Must conform to the same format as RUST_LOG. (default: {flag_logging:?}) --log-file FILENAME Specify a filename into which logging should be diff --git a/parity/configuration.rs b/parity/configuration.rs index 7a80f8f93..b28d2608e 100644 --- a/parity/configuration.rs +++ b/parity/configuration.rs @@ -551,10 +551,14 @@ impl Configuration { Ok(options) } + fn ntp_servers(&self) -> Vec { + self.args.flag_ntp_servers.split(",").map(str::to_owned).collect() + } + fn ui_config(&self) -> UiConfiguration { UiConfiguration { enabled: self.ui_enabled(), - ntp_server: self.args.flag_ntp_server.clone(), + ntp_servers: self.ntp_servers(), interface: self.ui_interface(), port: self.args.flag_ports_shift + self.args.flag_ui_port, hosts: self.ui_hosts(), @@ -564,7 +568,7 @@ impl Configuration { fn dapps_config(&self) -> DappsConfiguration { DappsConfiguration { enabled: self.dapps_enabled(), - ntp_server: self.args.flag_ntp_server.clone(), + ntp_servers: self.ntp_servers(), dapps_path: PathBuf::from(self.directories().dapps), extra_dapps: if self.args.cmd_dapp { self.args.arg_path.iter().map(|path| PathBuf::from(path)).collect() @@ -1278,7 +1282,12 @@ mod tests { support_token_api: true }, UiConfiguration { enabled: true, - ntp_server: "none".into(), + ntp_servers: vec![ + "0.parity.pool.ntp.org:123".into(), + "1.parity.pool.ntp.org:123".into(), + "2.parity.pool.ntp.org:123".into(), + "3.parity.pool.ntp.org:123".into(), + ], interface: "127.0.0.1".into(), port: 8180, hosts: Some(vec![]), @@ -1521,10 +1530,16 @@ mod tests { let conf3 = parse(&["parity", "--ui-path", "signer", "--ui-interface", "test"]); // then + let ntp_servers = vec![ + "0.parity.pool.ntp.org:123".into(), + "1.parity.pool.ntp.org:123".into(), + "2.parity.pool.ntp.org:123".into(), + "3.parity.pool.ntp.org:123".into(), + ]; assert_eq!(conf0.directories().signer, "signer".to_owned()); assert_eq!(conf0.ui_config(), UiConfiguration { enabled: true, - ntp_server: "none".into(), + ntp_servers: ntp_servers.clone(), interface: "127.0.0.1".into(), port: 8180, hosts: Some(vec![]), @@ -1533,7 +1548,7 @@ mod tests { assert_eq!(conf1.directories().signer, "signer".to_owned()); assert_eq!(conf1.ui_config(), UiConfiguration { enabled: true, - ntp_server: "none".into(), + ntp_servers: ntp_servers.clone(), interface: "127.0.0.1".into(), port: 8180, hosts: Some(vec![]), @@ -1543,7 +1558,7 @@ mod tests { assert_eq!(conf2.directories().signer, "signer".to_owned()); assert_eq!(conf2.ui_config(), UiConfiguration { enabled: true, - ntp_server: "none".into(), + ntp_servers: ntp_servers.clone(), interface: "127.0.0.1".into(), port: 3123, hosts: Some(vec![]), @@ -1552,7 +1567,7 @@ mod tests { assert_eq!(conf3.directories().signer, "signer".to_owned()); assert_eq!(conf3.ui_config(), UiConfiguration { enabled: true, - ntp_server: "none".into(), + ntp_servers: ntp_servers.clone(), interface: "test".into(), port: 8180, hosts: Some(vec![]), diff --git a/parity/dapps.rs b/parity/dapps.rs index cec3765f2..a67b3fe3a 100644 --- a/parity/dapps.rs +++ b/parity/dapps.rs @@ -36,7 +36,7 @@ use util::{Bytes, Address}; #[derive(Debug, PartialEq, Clone)] pub struct Configuration { pub enabled: bool, - pub ntp_server: String, + pub ntp_servers: Vec, pub dapps_path: PathBuf, pub extra_dapps: Vec, pub extra_embed_on: Vec<(String, u16)>, @@ -47,7 +47,12 @@ impl Default for Configuration { let data_dir = default_data_path(); Configuration { enabled: true, - ntp_server: "none".into(), + ntp_servers: vec![ + "0.parity.pool.ntp.org:123".into(), + "1.parity.pool.ntp.org:123".into(), + "2.parity.pool.ntp.org:123".into(), + "3.parity.pool.ntp.org:123".into(), + ], dapps_path: replace_home(&data_dir, "$BASE/dapps").into(), extra_dapps: vec![], extra_embed_on: vec![], @@ -158,7 +163,7 @@ pub fn new(configuration: Configuration, deps: Dependencies) -> Result Result Result, String> { +pub fn new_ui(enabled: bool, ntp_servers: &[String], deps: Dependencies) -> Result, String> { if !enabled { return Ok(None); } server::ui_middleware( deps, - ntp_server, + ntp_servers, rpc::DAPPS_DOMAIN, ).map(Some) } @@ -204,7 +209,7 @@ mod server { pub fn dapps_middleware( _deps: Dependencies, - _ntp_server: &str, + _ntp_servers: &[String], _dapps_path: PathBuf, _extra_dapps: Vec, _dapps_domain: &str, @@ -215,7 +220,7 @@ mod server { pub fn ui_middleware( _deps: Dependencies, - _ntp_server: &str, + _ntp_servers: &[String], _dapps_domain: &str, ) -> Result { Err("Your Parity version has been compiled without UI support.".into()) @@ -241,7 +246,7 @@ mod server { pub fn dapps_middleware( deps: Dependencies, - ntp_server: &str, + ntp_servers: &[String], dapps_path: PathBuf, extra_dapps: Vec, dapps_domain: &str, @@ -252,7 +257,7 @@ mod server { let web_proxy_tokens = Arc::new(move |token| signer.web_proxy_access_token_domain(&token)); Ok(parity_dapps::Middleware::dapps( - ntp_server, + ntp_servers, deps.pool, parity_remote, deps.ui_address, @@ -269,12 +274,12 @@ mod server { pub fn ui_middleware( deps: Dependencies, - ntp_server: &str, + ntp_servers: &[String], dapps_domain: &str, ) -> Result { let parity_remote = parity_reactor::Remote::new(deps.remote.clone()); Ok(parity_dapps::Middleware::ui( - ntp_server, + ntp_servers, deps.pool, parity_remote, dapps_domain, diff --git a/parity/rpc.rs b/parity/rpc.rs index b15c331d6..9173e7b2f 100644 --- a/parity/rpc.rs +++ b/parity/rpc.rs @@ -73,7 +73,7 @@ impl Default for HttpConfiguration { #[derive(Debug, PartialEq, Clone)] pub struct UiConfiguration { pub enabled: bool, - pub ntp_server: String, + pub ntp_servers: Vec, pub interface: String, pub port: u16, pub hosts: Option>, @@ -107,7 +107,12 @@ impl Default for UiConfiguration { fn default() -> Self { UiConfiguration { enabled: true && cfg!(feature = "ui-enabled"), - ntp_server: "none".into(), + ntp_servers: vec![ + "0.parity.pool.ntp.org:123".into(), + "1.parity.pool.ntp.org:123".into(), + "2.parity.pool.ntp.org:123".into(), + "3.parity.pool.ntp.org:123".into(), + ], port: 8180, interface: "127.0.0.1".into(), hosts: Some(vec![]), diff --git a/parity/run.rs b/parity/run.rs index c08294a42..0c5f11bbb 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -311,7 +311,7 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc) -> }; let dapps_middleware = dapps::new(cmd.dapps_conf.clone(), dapps_deps.clone())?; - let ui_middleware = dapps::new_ui(cmd.ui_conf.enabled, &cmd.ui_conf.ntp_server, dapps_deps)?; + let ui_middleware = dapps::new_ui(cmd.ui_conf.enabled, &cmd.ui_conf.ntp_servers, dapps_deps)?; // start RPCs let dapps_service = dapps::service(&dapps_middleware); @@ -687,7 +687,7 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc) -> R } }; let dapps_middleware = dapps::new(cmd.dapps_conf.clone(), dapps_deps.clone())?; - let ui_middleware = dapps::new_ui(cmd.ui_conf.enabled, &cmd.ui_conf.ntp_server, dapps_deps)?; + let ui_middleware = dapps::new_ui(cmd.ui_conf.enabled, &cmd.ui_conf.ntp_servers, dapps_deps)?; let dapps_service = dapps::service(&dapps_middleware); let deps_for_rpc_apis = Arc::new(rpc_apis::FullDependencies { diff --git a/price-info/Cargo.toml b/price-info/Cargo.toml index b0226df8e..a6cc13eb7 100644 --- a/price-info/Cargo.toml +++ b/price-info/Cargo.toml @@ -13,4 +13,4 @@ log = "0.3" serde_json = "1.0" [dev-dependencies] -ethcore-util = { path = "../util" } +parking_lot = "0.4" diff --git a/price-info/src/lib.rs b/price-info/src/lib.rs index ec6fcfb5d..ba5719f40 100644 --- a/price-info/src/lib.rs +++ b/price-info/src/lib.rs @@ -26,6 +26,207 @@ extern crate log; pub extern crate fetch; -mod price_info; +use std::cmp; +use std::fmt; +use std::io; +use std::io::Read; -pub use price_info::*; +use fetch::{Client as FetchClient, Fetch}; +use futures::Future; +use serde_json::Value; + +/// Current ETH price information. +#[derive(Debug)] +pub struct PriceInfo { + /// Current ETH price in USD. + pub ethusd: f32, +} + +/// Price info error. +#[derive(Debug)] +pub enum Error { + /// The API returned an unexpected status code. + StatusCode(&'static str), + /// The API returned an unexpected status content. + UnexpectedResponse(String), + /// There was an error when trying to reach the API. + Fetch(fetch::Error), + /// IO error when reading API response. + Io(io::Error), +} + +impl From for Error { + fn from(err: io::Error) -> Self { Error::Io(err) } +} + +impl From for Error { + fn from(err: fetch::Error) -> Self { Error::Fetch(err) } +} + +/// A client to get the current ETH price using an external API. +pub struct Client { + api_endpoint: String, + fetch: F, +} + +impl fmt::Debug for Client { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("price_info::Client") + .field("api_endpoint", &self.api_endpoint) + .finish() + } +} + +impl cmp::PartialEq for Client { + fn eq(&self, other: &Client) -> bool { + self.api_endpoint == other.api_endpoint + } +} + +impl Client { + /// Creates a new instance of the `Client` given a `fetch::Client`. + pub fn new(fetch: F) -> Client { + let api_endpoint = "http://api.etherscan.io/api?module=stats&action=ethprice".to_owned(); + Client { api_endpoint, fetch } + } + + /// Gets the current ETH price and calls `set_price` with the result. + pub fn get(&self, set_price: G) { + self.fetch.forget(self.fetch.fetch(&self.api_endpoint) + .map_err(|err| Error::Fetch(err)) + .and_then(move |mut response| { + if !response.is_success() { + return Err(Error::StatusCode(response.status().canonical_reason().unwrap_or("unknown"))); + } + let mut result = String::new(); + response.read_to_string(&mut result)?; + + let value: Option = serde_json::from_str(&result).ok(); + + let ethusd = value + .as_ref() + .and_then(|value| value.pointer("/result/ethusd")) + .and_then(|obj| obj.as_str()) + .and_then(|s| s.parse().ok()); + + match ethusd { + Some(ethusd) => { + set_price(PriceInfo { ethusd }); + Ok(()) + }, + None => Err(Error::UnexpectedResponse(result)), + } + }) + .map_err(|err| { + warn!("Failed to auto-update latest ETH price: {:?}", err); + err + }) + ); + } +} + +#[cfg(test)] +mod test { + extern crate parking_lot; + + use self::parking_lot::Mutex; + use std::sync::Arc; + use std::sync::atomic::{AtomicBool, Ordering}; + use fetch; + use fetch::Fetch; + use futures; + use futures::future::{Future, FutureResult}; + use Client; + + #[derive(Clone)] + struct FakeFetch(Option, Arc>); + impl Fetch for FakeFetch { + type Result = FutureResult; + fn new() -> Result where Self: Sized { Ok(FakeFetch(None, Default::default())) } + fn fetch_with_abort(&self, url: &str, _abort: fetch::Abort) -> Self::Result { + assert_eq!(url, "http://api.etherscan.io/api?module=stats&action=ethprice"); + let mut val = self.1.lock(); + *val = *val + 1; + if let Some(ref response) = self.0 { + let data = ::std::io::Cursor::new(response.clone()); + futures::future::ok(fetch::Response::from_reader(data)) + } else { + futures::future::ok(fetch::Response::not_found()) + } + } + + // this guarantees that the calls to price_info::Client::get will block for execution + fn forget(&self, f: F) where + F: Future + Send + 'static, + I: Send + 'static, + E: Send + 'static { + let _ = f.wait(); + } + } + + fn price_info_ok(response: &str) -> Client { + Client::new(FakeFetch(Some(response.to_owned()), Default::default())) + } + + fn price_info_not_found() -> Client { + Client::new(FakeFetch::new().unwrap()) + } + + #[test] + fn should_get_price_info() { + // given + let response = r#"{ + "status": "1", + "message": "OK", + "result": { + "ethbtc": "0.0891", + "ethbtc_timestamp": "1499894236", + "ethusd": "209.55", + "ethusd_timestamp": "1499894229" + } + }"#; + + let price_info = price_info_ok(response); + + // when + price_info.get(|price| { + + // then + assert_eq!(price.ethusd, 209.55); + }); + } + + #[test] + fn should_not_call_set_price_if_response_is_malformed() { + // given + let response = "{}"; + + let price_info = price_info_ok(response); + let b = Arc::new(AtomicBool::new(false)); + + // when + let bb = b.clone(); + price_info.get(move |_| { + bb.store(true, Ordering::Relaxed); + }); + + // then + assert_eq!(b.load(Ordering::Relaxed), false); + } + + #[test] + fn should_not_call_set_price_if_response_is_invalid() { + // given + let price_info = price_info_not_found(); + let b = Arc::new(AtomicBool::new(false)); + + // when + let bb = b.clone(); + price_info.get(move |_| { + bb.store(true, Ordering::Relaxed); + }); + + // then + assert_eq!(b.load(Ordering::Relaxed), false); + } +} diff --git a/price-info/src/price_info.rs b/price-info/src/price_info.rs deleted file mode 100644 index 36ca033d2..000000000 --- a/price-info/src/price_info.rs +++ /dev/null @@ -1,222 +0,0 @@ -// Copyright 2015-2017 Parity Technologies (UK) Ltd. -// This file is part of Parity. - -// Parity is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity. If not, see . - -use std::cmp; -use std::fmt; -use std::io; -use std::io::Read; -use std::str::FromStr; - -use fetch; -use fetch::{Client as FetchClient, Fetch}; -use futures::Future; -use serde_json; -use serde_json::Value; - -/// Current ETH price information. -#[derive(Debug)] -pub struct PriceInfo { - /// Current ETH price in USD. - pub ethusd: f32, -} - -/// Price info error. -#[derive(Debug)] -pub enum Error { - /// The API returned an unexpected status code or content. - UnexpectedResponse(&'static str, String), - /// There was an error when trying to reach the API. - Fetch(fetch::Error), - /// IO error when reading API response. - Io(io::Error), -} - -impl From for Error { - fn from(err: io::Error) -> Self { Error::Io(err) } -} - -impl From for Error { - fn from(err: fetch::Error) -> Self { Error::Fetch(err) } -} - -/// A client to get the current ETH price using an external API. -pub struct Client { - api_endpoint: String, - fetch: F, -} - -impl fmt::Debug for Client { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_struct("price_info::Client") - .field("api_endpoint", &self.api_endpoint) - .finish() - } -} - -impl cmp::PartialEq for Client { - fn eq(&self, other: &Client) -> bool { - self.api_endpoint == other.api_endpoint - } -} - -impl Client { - /// Creates a new instance of the `Client` given a `fetch::Client`. - pub fn new(fetch: F) -> Client { - let api_endpoint = "http://api.etherscan.io/api?module=stats&action=ethprice".to_owned(); - Client { api_endpoint, fetch } - } - - /// Gets the current ETH price and calls `set_price` with the result. - pub fn get(&self, set_price: G) { - self.fetch.forget(self.fetch.fetch(&self.api_endpoint) - .map_err(|err| Error::Fetch(err)) - .and_then(move |mut response| { - let mut result = String::new(); - response.read_to_string(&mut result)?; - - if response.is_success() { - let value: Result = serde_json::from_str(&result); - if let Ok(v) = value { - let obj = v.pointer("/result/ethusd").and_then(|obj| { - match *obj { - Value::String(ref s) => FromStr::from_str(s).ok(), - _ => None, - } - }); - - if let Some(ethusd) = obj { - set_price(PriceInfo { ethusd }); - return Ok(()); - } - } - } - - let status = response.status().canonical_reason().unwrap_or("unknown"); - Err(Error::UnexpectedResponse(status, result)) - }) - .map_err(|err| { - warn!("Failed to auto-update latest ETH price: {:?}", err); - err - }) - ); - } -} - -#[cfg(test)] -mod test { - extern crate ethcore_util as util; - - use self::util::Mutex; - use std::sync::Arc; - use std::sync::atomic::{AtomicBool, Ordering}; - use fetch; - use fetch::Fetch; - use futures; - use futures::future::{Future, FutureResult}; - use price_info::Client; - - #[derive(Clone)] - struct FakeFetch(Option, Arc>); - impl Fetch for FakeFetch { - type Result = FutureResult; - fn new() -> Result where Self: Sized { Ok(FakeFetch(None, Default::default())) } - fn fetch_with_abort(&self, url: &str, _abort: fetch::Abort) -> Self::Result { - assert_eq!(url, "http://api.etherscan.io/api?module=stats&action=ethprice"); - let mut val = self.1.lock(); - *val = *val + 1; - if let Some(ref response) = self.0 { - let data = ::std::io::Cursor::new(response.clone()); - futures::future::ok(fetch::Response::from_reader(data)) - } else { - futures::future::ok(fetch::Response::not_found()) - } - } - - // this guarantees that the calls to price_info::Client::get will block for execution - fn forget(&self, f: F) where - F: Future + Send + 'static, - I: Send + 'static, - E: Send + 'static { - let _ = f.wait(); - } - } - - fn price_info_ok(response: &str) -> Client { - Client::new(FakeFetch(Some(response.to_owned()), Default::default())) - } - - fn price_info_not_found() -> Client { - Client::new(FakeFetch::new().unwrap()) - } - - #[test] - fn should_get_price_info() { - // given - let response = r#"{ - "status": "1", - "message": "OK", - "result": { - "ethbtc": "0.0891", - "ethbtc_timestamp": "1499894236", - "ethusd": "209.55", - "ethusd_timestamp": "1499894229" - } - }"#; - - let price_info = price_info_ok(response); - - // when - price_info.get(|price| { - - // then - assert_eq!(price.ethusd, 209.55); - }); - } - - #[test] - fn should_not_call_set_price_if_response_is_malformed() { - // given - let response = "{}"; - - let price_info = price_info_ok(response); - let b = Arc::new(AtomicBool::new(false)); - - // when - let bb = b.clone(); - price_info.get(move |_| { - bb.store(true, Ordering::Relaxed); - }); - - // then - assert_eq!(b.load(Ordering::Relaxed), false); - } - - #[test] - fn should_not_call_set_price_if_response_is_invalid() { - // given - let price_info = price_info_not_found(); - let b = Arc::new(AtomicBool::new(false)); - - // when - let bb = b.clone(); - price_info.get(move |_| { - bb.store(true, Ordering::Relaxed); - }); - - // then - assert_eq!(b.load(Ordering::Relaxed), false); - } -} diff --git a/secret_store/Cargo.toml b/secret_store/Cargo.toml index eea49978d..19f342aa9 100644 --- a/secret_store/Cargo.toml +++ b/secret_store/Cargo.toml @@ -35,3 +35,4 @@ ethcore-logger = { path = "../logger" } ethcrypto = { path = "../ethcrypto" } ethkey = { path = "../ethkey" } native-contracts = { path = "../ethcore/native_contracts" } +lazy_static = "0.2" diff --git a/secret_store/src/acl_storage.rs b/secret_store/src/acl_storage.rs index 816d100dc..37d5bcd25 100644 --- a/secret_store/src/acl_storage.rs +++ b/secret_store/src/acl_storage.rs @@ -14,12 +14,13 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use std::sync::Arc; +use std::sync::{Arc, Weak}; use futures::{future, Future}; use parking_lot::Mutex; use ethkey::public_to_address; -use ethcore::client::{Client, BlockChainClient, BlockId}; +use ethcore::client::{Client, BlockChainClient, BlockId, ChainNotify}; use native_contracts::SecretStoreAclStorage; +use util::{H256, Address, Bytes}; use types::all::{Error, ServerKeyId, Public}; const ACL_CHECKER_CONTRACT_REGISTRY_NAME: &'static str = "secretstore_acl_checker"; @@ -32,40 +33,82 @@ pub trait AclStorage: Send + Sync { /// On-chain ACL storage implementation. pub struct OnChainAclStorage { + /// Cached on-chain contract. + contract: Mutex, +} + +/// Cached on-chain ACL storage contract. +struct CachedContract { /// Blockchain client. - client: Arc, - /// On-chain contract. - contract: Mutex>, + client: Weak, + /// Contract address. + contract_addr: Option
, + /// Contract at given address. + contract: Option, } impl OnChainAclStorage { - pub fn new(client: Arc) -> Self { - OnChainAclStorage { - client: client, - contract: Mutex::new(None), - } + pub fn new(client: &Arc) -> Arc { + let acl_storage = Arc::new(OnChainAclStorage { + contract: Mutex::new(CachedContract::new(client)), + }); + client.add_notify(acl_storage.clone()); + acl_storage } } impl AclStorage for OnChainAclStorage { fn check(&self, public: &Public, document: &ServerKeyId) -> Result { - let mut contract = self.contract.lock(); - if !contract.is_some() { - *contract = self.client.registry_address(ACL_CHECKER_CONTRACT_REGISTRY_NAME.to_owned()) - .and_then(|contract_addr| { + self.contract.lock().check(public, document) + } +} + +impl ChainNotify for OnChainAclStorage { + fn new_blocks(&self, _imported: Vec, _invalid: Vec, enacted: Vec, retracted: Vec, _sealed: Vec, _proposed: Vec, _duration: u64) { + if !enacted.is_empty() || !retracted.is_empty() { + self.contract.lock().update() + } + } +} + +impl CachedContract { + pub fn new(client: &Arc) -> Self { + CachedContract { + client: Arc::downgrade(client), + contract_addr: None, + contract: None, + } + } + + pub fn update(&mut self) { + if let Some(client) = self.client.upgrade() { + let new_contract_addr = client.registry_address(ACL_CHECKER_CONTRACT_REGISTRY_NAME.to_owned()); + if self.contract_addr.as_ref() != new_contract_addr.as_ref() { + self.contract = new_contract_addr.map(|contract_addr| { trace!(target: "secretstore", "Configuring for ACL checker contract from {}", contract_addr); - Some(SecretStoreAclStorage::new(contract_addr)) - }) + SecretStoreAclStorage::new(contract_addr) + }); + + self.contract_addr = new_contract_addr; + } } - if let Some(ref contract) = *contract { - let address = public_to_address(&public); - let do_call = |a, d| future::done(self.client.call_contract(BlockId::Latest, a, d)); - contract.check_permissions(do_call, address, document.clone()) - .map_err(|err| Error::Internal(err)) - .wait() - } else { - Err(Error::Internal("ACL checker contract is not configured".to_owned())) + } + + pub fn check(&mut self, public: &Public, document: &ServerKeyId) -> Result { + match self.contract.as_ref() { + Some(contract) => { + let address = public_to_address(&public); + let do_call = |a, d| future::done( + self.client + .upgrade() + .ok_or("Calling contract without client".into()) + .and_then(|c| c.call_contract(BlockId::Latest, a, d))); + contract.check_permissions(do_call, address, document.clone()) + .map_err(|err| Error::Internal(err)) + .wait() + }, + None => Err(Error::Internal("ACL checker contract is not configured".to_owned())), } } } diff --git a/secret_store/src/key_server.rs b/secret_store/src/key_server.rs index fd4e154fa..c83e460f3 100644 --- a/secret_store/src/key_server.rs +++ b/secret_store/src/key_server.rs @@ -24,6 +24,7 @@ use ethcrypto; use ethkey; use super::acl_storage::AclStorage; use super::key_storage::KeyStorage; +use super::key_server_set::KeyServerSet; use key_server_cluster::{math, ClusterCore}; use traits::{ServerKeyGenerator, DocumentKeyServer, MessageSigner, KeyServer}; use types::all::{Error, Public, RequestSignature, ServerKeyId, EncryptedDocumentKey, EncryptedDocumentKeyShadow, @@ -44,9 +45,9 @@ pub struct KeyServerCore { impl KeyServerImpl { /// Create new key server instance - pub fn new(config: &ClusterConfiguration, acl_storage: Arc, key_storage: Arc) -> Result { + pub fn new(config: &ClusterConfiguration, key_server_set: Arc, acl_storage: Arc, key_storage: Arc) -> Result { Ok(KeyServerImpl { - data: Arc::new(Mutex::new(KeyServerCore::new(config, acl_storage, key_storage)?)), + data: Arc::new(Mutex::new(KeyServerCore::new(config, key_server_set, acl_storage, key_storage)?)), }) } @@ -143,14 +144,12 @@ impl MessageSigner for KeyServerImpl { } impl KeyServerCore { - pub fn new(config: &ClusterConfiguration, acl_storage: Arc, key_storage: Arc) -> Result { + pub fn new(config: &ClusterConfiguration, key_server_set: Arc, acl_storage: Arc, key_storage: Arc) -> Result { let config = NetClusterConfiguration { threads: config.threads, self_key_pair: ethkey::KeyPair::from_secret_slice(&config.self_private)?, listen_address: (config.listener_address.address.clone(), config.listener_address.port), - nodes: config.nodes.iter() - .map(|(node_id, node_address)| (node_id.clone(), (node_address.address.clone(), node_address.port))) - .collect(), + key_server_set: key_server_set, allow_connecting_to_higher_nodes: config.allow_connecting_to_higher_nodes, acl_storage: acl_storage, key_storage: key_storage, @@ -193,10 +192,13 @@ impl Drop for KeyServerCore { pub mod tests { use std::time; use std::sync::Arc; + use std::net::SocketAddr; + use std::collections::BTreeMap; use ethcrypto; use ethkey::{self, Secret, Random, Generator}; use acl_storage::tests::DummyAclStorage; use key_storage::tests::DummyKeyStorage; + use key_server_set::tests::MapKeyServerSet; use key_server_cluster::math; use util::H256; use types::all::{Error, Public, ClusterConfiguration, NodeAddress, RequestSignature, ServerKeyId, @@ -254,8 +256,11 @@ pub mod tests { })).collect(), allow_connecting_to_higher_nodes: false, }).collect(); + let key_servers_set: BTreeMap = configs[0].nodes.iter() + .map(|(k, a)| (k.clone(), format!("{}:{}", a.address, a.port).parse().unwrap())) + .collect(); let key_servers: Vec<_> = configs.into_iter().map(|cfg| - KeyServerImpl::new(&cfg, Arc::new(DummyAclStorage::default()), Arc::new(DummyKeyStorage::default())).unwrap() + KeyServerImpl::new(&cfg, Arc::new(MapKeyServerSet::new(key_servers_set.clone())), Arc::new(DummyAclStorage::default()), Arc::new(DummyKeyStorage::default())).unwrap() ).collect(); // wait until connections are established. It is fast => do not bother with events here diff --git a/secret_store/src/key_server_cluster/cluster.rs b/secret_store/src/key_server_cluster/cluster.rs index c86f30267..d77a82431 100644 --- a/secret_store/src/key_server_cluster/cluster.rs +++ b/secret_store/src/key_server_cluster/cluster.rs @@ -28,7 +28,7 @@ use tokio_core::reactor::{Handle, Remote, Interval}; use tokio_core::net::{TcpListener, TcpStream}; use ethkey::{Public, KeyPair, Signature, Random, Generator}; use util::H256; -use key_server_cluster::{Error, NodeId, SessionId, AclStorage, KeyStorage}; +use key_server_cluster::{Error, NodeId, SessionId, AclStorage, KeyStorage, KeyServerSet}; use key_server_cluster::cluster_sessions::{ClusterSession, ClusterSessions, GenerationSessionWrapper, EncryptionSessionWrapper, DecryptionSessionWrapper, SigningSessionWrapper}; use key_server_cluster::message::{self, Message, ClusterMessage, GenerationMessage, EncryptionMessage, DecryptionMessage, @@ -102,8 +102,8 @@ pub struct ClusterConfiguration { pub self_key_pair: KeyPair, /// Interface to listen to. pub listen_address: (String, u16), - /// Cluster nodes. - pub nodes: BTreeMap, + /// Cluster nodes set. + pub key_server_set: Arc, /// Reference to key storage pub key_storage: Arc, /// Reference to ACL storage @@ -158,9 +158,17 @@ pub struct ClusterConnections { /// Self node id. pub self_node_id: NodeId, /// All known other key servers. - pub nodes: BTreeMap, + pub key_server_set: Arc, + /// Connections data. + pub data: RwLock, +} + +/// Cluster connections data. +pub struct ClusterConnectionsData { + /// Active key servers set. + pub nodes: BTreeMap, /// Active connections to key servers. - pub connections: RwLock>>, + pub connections: BTreeMap>, } /// Cluster view core. @@ -281,8 +289,7 @@ impl ClusterCore { /// Accept connection future. fn accept_connection_future(handle: &Handle, data: Arc, stream: TcpStream, node_address: SocketAddr) -> BoxedEmptyFuture { - let disconnected_nodes = data.connections.disconnected_nodes().keys().cloned().collect(); - net_accept_connection(node_address, stream, handle, data.self_key_pair.clone(), disconnected_nodes) + net_accept_connection(node_address, stream, handle, data.self_key_pair.clone()) .then(move |result| ClusterCore::process_connection_result(data, true, result)) .then(|_| finished(())) .boxed() @@ -354,6 +361,7 @@ impl ClusterCore { /// Try to connect to every disconnected node. fn connect_disconnected_nodes(data: Arc) { + data.connections.update_nodes_set(); for (node_id, node_address) in data.connections.disconnected_nodes() { if data.config.allow_connecting_to_higher_nodes || data.self_key_pair.public() < &node_id { ClusterCore::connect(data.clone(), node_address); @@ -372,14 +380,16 @@ impl ClusterCore { finished(Ok(())).boxed() } }, - Ok(DeadlineStatus::Meet(Err(_))) => { + Ok(DeadlineStatus::Meet(Err(err))) => { + warn!(target: "secretstore_net", "{}: protocol error {} when establishind connection", data.self_key_pair.public(), err); finished(Ok(())).boxed() }, Ok(DeadlineStatus::Timeout) => { + warn!(target: "secretstore_net", "{}: timeout when establishind connection", data.self_key_pair.public()); finished(Ok(())).boxed() }, - Err(_) => { - // network error + Err(err) => { + warn!(target: "secretstore_net", "{}: network error {} when establishind connection", data.self_key_pair.public(), err); finished(Ok(())).boxed() }, } @@ -665,33 +675,38 @@ impl ClusterCore { impl ClusterConnections { pub fn new(config: &ClusterConfiguration) -> Result { - let mut connections = ClusterConnections { + let mut nodes = config.key_server_set.get(); + nodes.remove(config.self_key_pair.public()); + + Ok(ClusterConnections { self_node_id: config.self_key_pair.public().clone(), - nodes: BTreeMap::new(), - connections: RwLock::new(BTreeMap::new()), - }; - - for (node_id, &(ref node_addr, node_port)) in config.nodes.iter().filter(|&(node_id, _)| node_id != config.self_key_pair.public()) { - let socket_address = make_socket_address(&node_addr, node_port)?; - connections.nodes.insert(node_id.clone(), socket_address); - } - - Ok(connections) + key_server_set: config.key_server_set.clone(), + data: RwLock::new(ClusterConnectionsData { + nodes: nodes, + connections: BTreeMap::new(), + }), + }) } pub fn cluster_state(&self) -> ClusterState { ClusterState { - connected: self.connections.read().keys().cloned().collect(), + connected: self.data.read().connections.keys().cloned().collect(), } } pub fn get(&self, node: &NodeId) -> Option> { - self.connections.read().get(node).cloned() + self.data.read().connections.get(node).cloned() } pub fn insert(&self, connection: Arc) -> bool { - let mut connections = self.connections.write(); - if connections.contains_key(connection.node_id()) { + let mut data = self.data.write(); + if !data.nodes.contains_key(connection.node_id()) { + // incoming connections are checked here + trace!(target: "secretstore_net", "{}: ignoring unknown connection from {} at {}", self.self_node_id, connection.node_id(), connection.node_address()); + debug_assert!(connection.is_inbound()); + return false; + } + if data.connections.contains_key(connection.node_id()) { // we have already connected to the same node // the agreement is that node with lower id must establish connection to node with higher id if (&self.self_node_id < connection.node_id() && connection.is_inbound()) @@ -700,14 +715,15 @@ impl ClusterConnections { } } - trace!(target: "secretstore_net", "{}: inserting connection to {} at {}", self.self_node_id, connection.node_id(), connection.node_address()); - connections.insert(connection.node_id().clone(), connection); + trace!(target: "secretstore_net", "{}: inserting connection to {} at {}. Connected to {} of {} nodes", + self.self_node_id, connection.node_id(), connection.node_address(), data.connections.len() + 1, data.nodes.len()); + data.connections.insert(connection.node_id().clone(), connection); true } pub fn remove(&self, node: &NodeId, is_inbound: bool) { - let mut connections = self.connections.write(); - if let Entry::Occupied(entry) = connections.entry(node.clone()) { + let mut data = self.data.write(); + if let Entry::Occupied(entry) = data.connections.entry(node.clone()) { if entry.get().is_inbound() != is_inbound { return; } @@ -718,20 +734,64 @@ impl ClusterConnections { } pub fn connected_nodes(&self) -> BTreeSet { - self.connections.read().keys().cloned().collect() + self.data.read().connections.keys().cloned().collect() } pub fn active_connections(&self)-> Vec> { - self.connections.read().values().cloned().collect() + self.data.read().connections.values().cloned().collect() } pub fn disconnected_nodes(&self) -> BTreeMap { - let connections = self.connections.read(); - self.nodes.iter() - .filter(|&(node_id, _)| !connections.contains_key(node_id)) + let data = self.data.read(); + data.nodes.iter() + .filter(|&(node_id, _)| !data.connections.contains_key(node_id)) .map(|(node_id, node_address)| (node_id.clone(), node_address.clone())) .collect() } + + pub fn update_nodes_set(&self) { + let mut data = self.data.write(); + let mut new_nodes = self.key_server_set.get(); + // we do not need to connect to self + // + we do not need to try to connect to any other node if we are not the part of a cluster + if new_nodes.remove(&self.self_node_id).is_none() { + new_nodes.clear(); + } + + let mut num_added_nodes = 0; + let mut num_removed_nodes = 0; + let mut num_changed_nodes = 0; + + for obsolete_node in data.nodes.keys().cloned().collect::>() { + if !new_nodes.contains_key(&obsolete_node) { + if let Entry::Occupied(entry) = data.connections.entry(obsolete_node) { + trace!(target: "secretstore_net", "{}: removing connection to {} at {}", self.self_node_id, entry.get().node_id(), entry.get().node_address()); + entry.remove(); + } + + data.nodes.remove(&obsolete_node); + num_removed_nodes += 1; + } + } + + for (new_node_public, new_node_addr) in new_nodes { + match data.nodes.insert(new_node_public, new_node_addr) { + None => num_added_nodes += 1, + Some(old_node_addr) => if new_node_addr != old_node_addr { + if let Entry::Occupied(entry) = data.connections.entry(new_node_public) { + trace!(target: "secretstore_net", "{}: removing connection to {} at {}", self.self_node_id, entry.get().node_id(), entry.get().node_address()); + entry.remove(); + } + num_changed_nodes += 1; + }, + } + } + + if num_added_nodes != 0 || num_removed_nodes != 0 || num_changed_nodes != 0 { + trace!(target: "secretstore_net", "{}: updated nodes set: removed {}, added {}, changed {}. Connected to {} of {} nodes", + self.self_node_id, num_removed_nodes, num_added_nodes, num_changed_nodes, data.connections.len(), data.nodes.len()); + } + } } impl ClusterData { @@ -929,7 +989,7 @@ pub mod tests { use parking_lot::Mutex; use tokio_core::reactor::Core; use ethkey::{Random, Generator, Public}; - use key_server_cluster::{NodeId, SessionId, Error, DummyAclStorage, DummyKeyStorage}; + use key_server_cluster::{NodeId, SessionId, Error, DummyAclStorage, DummyKeyStorage, MapKeyServerSet}; use key_server_cluster::message::Message; use key_server_cluster::cluster::{Cluster, ClusterCore, ClusterConfiguration}; use key_server_cluster::generation_session::{Session as GenerationSession, SessionState as GenerationSessionState}; @@ -999,7 +1059,7 @@ pub mod tests { } pub fn all_connections_established(cluster: &Arc) -> bool { - cluster.config().nodes.keys() + cluster.config().key_server_set.get().keys() .filter(|p| *p != cluster.config().self_key_pair.public()) .all(|p| cluster.connection(p).is_some()) } @@ -1010,9 +1070,9 @@ pub mod tests { threads: 1, self_key_pair: key_pairs[i].clone(), listen_address: ("127.0.0.1".to_owned(), ports_begin + i as u16), - nodes: key_pairs.iter().enumerate() - .map(|(j, kp)| (kp.public().clone(), ("127.0.0.1".into(), ports_begin + j as u16))) - .collect(), + key_server_set: Arc::new(MapKeyServerSet::new(key_pairs.iter().enumerate() + .map(|(j, kp)| (kp.public().clone(), format!("127.0.0.1:{}", ports_begin + j as u16).parse().unwrap())) + .collect())), allow_connecting_to_higher_nodes: false, key_storage: Arc::new(DummyKeyStorage::default()), acl_storage: Arc::new(DummyAclStorage::default()), diff --git a/secret_store/src/key_server_cluster/cluster_sessions.rs b/secret_store/src/key_server_cluster/cluster_sessions.rs index f66ad972f..f8e4974b1 100644 --- a/secret_store/src/key_server_cluster/cluster_sessions.rs +++ b/secret_store/src/key_server_cluster/cluster_sessions.rs @@ -135,7 +135,7 @@ impl ClusterSessions { pub fn new(config: &ClusterConfiguration) -> Self { ClusterSessions { self_node_id: config.self_key_pair.public().clone(), - nodes: config.nodes.keys().cloned().collect(), + nodes: config.key_server_set.get().keys().cloned().collect(), acl_storage: config.acl_storage.clone(), key_storage: config.key_storage.clone(), generation_sessions: ClusterSessionsContainer::new(), diff --git a/secret_store/src/key_server_cluster/io/handshake.rs b/secret_store/src/key_server_cluster/io/handshake.rs index 38d8a6ac1..df8f6cbf7 100644 --- a/secret_store/src/key_server_cluster/io/handshake.rs +++ b/secret_store/src/key_server_cluster/io/handshake.rs @@ -45,7 +45,7 @@ pub fn handshake_with_plain_confirmation(a: A, self_confirmation_plain: Resul state: state, self_key_pair: self_key_pair, self_confirmation_plain: self_confirmation_plain.unwrap_or(Default::default()), - trusted_nodes: trusted_nodes, + trusted_nodes: Some(trusted_nodes), other_node_id: None, other_confirmation_plain: None, shared_key: None, @@ -53,7 +53,7 @@ pub fn handshake_with_plain_confirmation(a: A, self_confirmation_plain: Resul } /// Wait for handshake procedure to be started by another node from the cluster. -pub fn accept_handshake(a: A, self_key_pair: KeyPair, trusted_nodes: BTreeSet) -> Handshake where A: AsyncWrite + AsyncRead { +pub fn accept_handshake(a: A, self_key_pair: KeyPair) -> Handshake where A: AsyncWrite + AsyncRead { let self_confirmation_plain = Random.generate().map(|kp| *kp.secret().clone()).map_err(Into::into); let (error, state) = match self_confirmation_plain.clone() { Ok(_) => (None, HandshakeState::ReceivePublicKey(read_message(a))), @@ -66,7 +66,7 @@ pub fn accept_handshake(a: A, self_key_pair: KeyPair, trusted_nodes: BTreeSet state: state, self_key_pair: self_key_pair, self_confirmation_plain: self_confirmation_plain.unwrap_or(Default::default()), - trusted_nodes: trusted_nodes, + trusted_nodes: None, other_node_id: None, other_confirmation_plain: None, shared_key: None, @@ -89,7 +89,7 @@ pub struct Handshake { state: HandshakeState, self_key_pair: KeyPair, self_confirmation_plain: H256, - trusted_nodes: BTreeSet, + trusted_nodes: Option>, other_node_id: Option, other_confirmation_plain: Option, shared_key: Option, @@ -172,7 +172,7 @@ impl Future for Handshake where A: AsyncRead + AsyncWrite { Err(err) => return Ok((stream, Err(err.into())).into()), }; - if !self.trusted_nodes.contains(&*message.node_id) { + if !self.trusted_nodes.as_ref().map(|tn| tn.contains(&*message.node_id)).unwrap_or(true) { return Ok((stream, Err(Error::InvalidNodeId)).into()); } @@ -300,7 +300,7 @@ mod tests { let trusted_nodes: BTreeSet<_> = vec![io.peer_public().clone()].into_iter().collect(); let shared_key = compute_shared_key(self_key_pair.secret(), trusted_nodes.iter().nth(0).unwrap()).unwrap(); - let mut handshake = accept_handshake(io, self_key_pair, trusted_nodes); + let mut handshake = accept_handshake(io, self_key_pair); handshake.set_self_confirmation_plain(self_confirmation_plain); let handshake_result = handshake.wait().unwrap(); diff --git a/secret_store/src/key_server_cluster/jobs/job_session.rs b/secret_store/src/key_server_cluster/jobs/job_session.rs index 7ae1da42a..6608397dd 100644 --- a/secret_store/src/key_server_cluster/jobs/job_session.rs +++ b/secret_store/src/key_server_cluster/jobs/job_session.rs @@ -299,22 +299,22 @@ impl JobSession where Executor: JobExe return Err(Error::ConsensusUnreachable); } - let active_data = self.data.active_data.as_mut() - .expect("we have checked that we are on master node; on master nodes active_data is filled during initialization; qed"); - if active_data.rejects.contains(node) { - return Ok(()); - } - if active_data.requests.remove(node) || active_data.responses.remove(node).is_some() { - active_data.rejects.insert(node.clone()); - if self.data.state == JobSessionState::Finished && active_data.responses.len() < self.meta.threshold + 1 { - self.data.state = JobSessionState::Active; - } - if active_data.requests.len() + active_data.responses.len() >= self.meta.threshold + 1 { + if let Some(active_data) = self.data.active_data.as_mut() { + if active_data.rejects.contains(node) { return Ok(()); } + if active_data.requests.remove(node) || active_data.responses.remove(node).is_some() { + active_data.rejects.insert(node.clone()); + if self.data.state == JobSessionState::Finished && active_data.responses.len() < self.meta.threshold + 1 { + self.data.state = JobSessionState::Active; + } + if active_data.requests.len() + active_data.responses.len() >= self.meta.threshold + 1 { + return Ok(()); + } - self.data.state = JobSessionState::Failed; - return Err(Error::ConsensusUnreachable); + self.data.state = JobSessionState::Failed; + return Err(Error::ConsensusUnreachable); + } } Ok(()) diff --git a/secret_store/src/key_server_cluster/mod.rs b/secret_store/src/key_server_cluster/mod.rs index 71c505f95..8f6ae4add 100644 --- a/secret_store/src/key_server_cluster/mod.rs +++ b/secret_store/src/key_server_cluster/mod.rs @@ -23,6 +23,7 @@ use super::types::all::ServerKeyId; pub use super::types::all::{NodeId, EncryptedDocumentKeyShadow}; pub use super::acl_storage::AclStorage; pub use super::key_storage::{KeyStorage, DocumentKeyShare}; +pub use super::key_server_set::KeyServerSet; pub use super::serialization::{SerializableSignature, SerializableH256, SerializableSecret, SerializablePublic, SerializableMessageHash}; pub use self::cluster::{ClusterCore, ClusterConfiguration, ClusterClient}; pub use self::generation_session::Session as GenerationSession; @@ -33,6 +34,8 @@ pub use self::decryption_session::Session as DecryptionSession; pub use super::key_storage::tests::DummyKeyStorage; #[cfg(test)] pub use super::acl_storage::tests::DummyAclStorage; +#[cfg(test)] +pub use super::key_server_set::tests::MapKeyServerSet; pub type SessionId = ServerKeyId; diff --git a/secret_store/src/key_server_cluster/net/accept_connection.rs b/secret_store/src/key_server_cluster/net/accept_connection.rs index 0daa8b2da..339625f3f 100644 --- a/secret_store/src/key_server_cluster/net/accept_connection.rs +++ b/secret_store/src/key_server_cluster/net/accept_connection.rs @@ -17,19 +17,18 @@ use std::io; use std::net::SocketAddr; use std::time::Duration; -use std::collections::BTreeSet; use futures::{Future, Poll}; use tokio_core::reactor::Handle; use tokio_core::net::TcpStream; use ethkey::KeyPair; -use key_server_cluster::{Error, NodeId}; +use key_server_cluster::Error; use key_server_cluster::io::{accept_handshake, Handshake, Deadline, deadline}; use key_server_cluster::net::Connection; /// Create future for accepting incoming connection. -pub fn accept_connection(address: SocketAddr, stream: TcpStream, handle: &Handle, self_key_pair: KeyPair, trusted_nodes: BTreeSet) -> Deadline { +pub fn accept_connection(address: SocketAddr, stream: TcpStream, handle: &Handle, self_key_pair: KeyPair) -> Deadline { let accept = AcceptConnection { - handshake: accept_handshake(stream, self_key_pair, trusted_nodes), + handshake: accept_handshake(stream, self_key_pair), address: address, }; diff --git a/secret_store/src/key_server_set.rs b/secret_store/src/key_server_set.rs new file mode 100644 index 000000000..e17dceed5 --- /dev/null +++ b/secret_store/src/key_server_set.rs @@ -0,0 +1,204 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use std::sync::{Arc, Weak}; +use std::net::SocketAddr; +use std::collections::BTreeMap; +use futures::{future, Future}; +use parking_lot::Mutex; +use ethcore::filter::Filter; +use ethcore::client::{Client, BlockChainClient, BlockId, ChainNotify}; +use native_contracts::KeyServerSet as KeyServerSetContract; +use util::{H256, Address, Bytes, Hashable}; +use types::all::{Error, Public, NodeAddress}; + +const KEY_SERVER_SET_CONTRACT_REGISTRY_NAME: &'static str = "secretstore_server_set"; + +/// Key server has been added to the set. +const ADDED_EVENT_NAME: &'static [u8] = &*b"KeyServerAdded(address)"; +/// Key server has been removed from the set. +const REMOVED_EVENT_NAME: &'static [u8] = &*b"KeyServerRemoved(address)"; + +lazy_static! { + static ref ADDED_EVENT_NAME_HASH: H256 = ADDED_EVENT_NAME.sha3(); + static ref REMOVED_EVENT_NAME_HASH: H256 = REMOVED_EVENT_NAME.sha3(); +} + +/// Key Server set +pub trait KeyServerSet: Send + Sync { + /// Get set of configured key servers + fn get(&self) -> BTreeMap; +} + +/// On-chain Key Server set implementation. +pub struct OnChainKeyServerSet { + /// Cached on-chain contract. + contract: Mutex, +} + +/// Cached on-chain Key Server set contract. +struct CachedContract { + /// Blockchain client. + client: Weak, + /// Contract address. + contract_addr: Option
, + /// Active set of key servers. + key_servers: BTreeMap, +} + +impl OnChainKeyServerSet { + pub fn new(client: &Arc, key_servers: BTreeMap) -> Result, Error> { + let mut cached_contract = CachedContract::new(client, key_servers)?; + let key_server_contract_address = client.registry_address(KEY_SERVER_SET_CONTRACT_REGISTRY_NAME.to_owned()); + // only initialize from contract if it is installed. otherwise - use default nodes + // once the contract is installed, all default nodes are lost (if not in the contract' set) + if key_server_contract_address.is_some() { + cached_contract.read_from_registry(&*client, key_server_contract_address); + } + + let key_server_set = Arc::new(OnChainKeyServerSet { + contract: Mutex::new(cached_contract), + }); + client.add_notify(key_server_set.clone()); + Ok(key_server_set) + } +} + +impl KeyServerSet for OnChainKeyServerSet { + fn get(&self) -> BTreeMap { + self.contract.lock().get() + } +} + +impl ChainNotify for OnChainKeyServerSet { + fn new_blocks(&self, _imported: Vec, _invalid: Vec, enacted: Vec, retracted: Vec, _sealed: Vec, _proposed: Vec, _duration: u64) { + if !enacted.is_empty() || !retracted.is_empty() { + self.contract.lock().update(enacted, retracted) + } + } +} + +impl CachedContract { + pub fn new(client: &Arc, key_servers: BTreeMap) -> Result { + Ok(CachedContract { + client: Arc::downgrade(client), + contract_addr: None, + key_servers: key_servers.into_iter() + .map(|(p, addr)| { + let addr = format!("{}:{}", addr.address, addr.port).parse() + .map_err(|err| Error::Internal(format!("error parsing node address: {}", err)))?; + Ok((p, addr)) + }) + .collect::, Error>>()?, + }) + } + + pub fn update(&mut self, enacted: Vec, retracted: Vec) { + if let Some(client) = self.client.upgrade() { + let new_contract_addr = client.registry_address(KEY_SERVER_SET_CONTRACT_REGISTRY_NAME.to_owned()); + + // new contract installed => read nodes set from the contract + if self.contract_addr.as_ref() != new_contract_addr.as_ref() { + self.read_from_registry(&*client, new_contract_addr); + return; + } + + // check for contract events + let is_set_changed = self.contract_addr.is_some() && enacted.iter() + .chain(retracted.iter()) + .any(|block_hash| !client.logs(Filter { + from_block: BlockId::Hash(block_hash.clone()), + to_block: BlockId::Hash(block_hash.clone()), + address: self.contract_addr.clone().map(|a| vec![a]), + topics: vec![ + Some(vec![*ADDED_EVENT_NAME_HASH, *REMOVED_EVENT_NAME_HASH]), + None, + None, + None, + ], + limit: Some(1), + }).is_empty()); + // to simplify processing - just re-read the whole nodes set from the contract + if is_set_changed { + self.read_from_registry(&*client, new_contract_addr); + } + } + } + + pub fn get(&self) -> BTreeMap { + self.key_servers.clone() + } + + fn read_from_registry(&mut self, client: &Client, new_contract_address: Option
) { + self.key_servers = new_contract_address.map(|contract_addr| { + trace!(target: "secretstore", "Configuring for key server set contract from {}", contract_addr); + + KeyServerSetContract::new(contract_addr) + }) + .map(|contract| { + let mut key_servers = BTreeMap::new(); + let do_call = |a, d| future::done(client.call_contract(BlockId::Latest, a, d)); + let key_servers_list = contract.get_key_servers(do_call).wait() + .map_err(|err| { trace!(target: "secretstore", "Error {} reading list of key servers from contract", err); err }) + .unwrap_or_default(); + for key_server in key_servers_list { + let key_server_public = contract.get_key_server_public( + |a, d| future::done(client.call_contract(BlockId::Latest, a, d)), key_server).wait() + .and_then(|p| if p.len() == 64 { Ok(Public::from_slice(&p)) } else { Err(format!("Invalid public length {}", p.len())) }); + let key_server_ip = contract.get_key_server_address( + |a, d| future::done(client.call_contract(BlockId::Latest, a, d)), key_server).wait() + .and_then(|a| a.parse().map_err(|e| format!("Invalid ip address: {}", e))); + + // only add successfully parsed nodes + match (key_server_public, key_server_ip) { + (Ok(key_server_public), Ok(key_server_ip)) => { key_servers.insert(key_server_public, key_server_ip); }, + (Err(public_err), _) => warn!(target: "secretstore_net", "received invalid public from key server set contract: {}", public_err), + (_, Err(ip_err)) => warn!(target: "secretstore_net", "received invalid IP from key server set contract: {}", ip_err), + } + } + key_servers + }) + .unwrap_or_default(); + self.contract_addr = new_contract_address; + } +} + +#[cfg(test)] +pub mod tests { + use std::collections::BTreeMap; + use std::net::SocketAddr; + use ethkey::Public; + use super::KeyServerSet; + + #[derive(Default)] + pub struct MapKeyServerSet { + nodes: BTreeMap, + } + + impl MapKeyServerSet { + pub fn new(nodes: BTreeMap) -> Self { + MapKeyServerSet { + nodes: nodes, + } + } + } + + impl KeyServerSet for MapKeyServerSet { + fn get(&self) -> BTreeMap { + self.nodes.clone() + } + } +} diff --git a/secret_store/src/lib.rs b/secret_store/src/lib.rs index f8a74dd1a..9750f7223 100644 --- a/secret_store/src/lib.rs +++ b/secret_store/src/lib.rs @@ -21,6 +21,8 @@ extern crate log; extern crate futures; extern crate futures_cpupool; extern crate hyper; +#[macro_use] +extern crate lazy_static; extern crate parking_lot; extern crate rustc_hex; extern crate serde; @@ -56,6 +58,7 @@ mod http_listener; mod key_server; mod key_storage; mod serialization; +mod key_server_set; use std::sync::Arc; use ethcore::client::Client; @@ -68,9 +71,10 @@ pub use traits::{KeyServer}; pub fn start(client: Arc, config: ServiceConfiguration) -> Result, Error> { use std::sync::Arc; - let acl_storage = Arc::new(acl_storage::OnChainAclStorage::new(client)); + let acl_storage = acl_storage::OnChainAclStorage::new(&client); + let key_server_set = key_server_set::OnChainKeyServerSet::new(&client, config.cluster_config.nodes.clone())?; let key_storage = Arc::new(key_storage::PersistentKeyStorage::new(&config)?); - let key_server = key_server::KeyServerImpl::new(&config.cluster_config, acl_storage, key_storage)?; + let key_server = key_server::KeyServerImpl::new(&config.cluster_config, key_server_set, acl_storage, key_storage)?; let listener = http_listener::KeyServerHttpListener::start(&config.listener_address, key_server)?; Ok(Box::new(listener)) }