Merge branch 'master' into ui-2

This commit is contained in:
Jaco Greeff 2017-08-09 09:32:22 +02:00
commit a11e9acd12
42 changed files with 898 additions and 463 deletions

12
.github/ISSUE_TEMPLATE.md vendored Normal file
View File

@ -0,0 +1,12 @@
_Before filing a new issue, please **provide the following information**._
> I'm running:
>
> - **Parity version**: 0.0.0
> - **Operating system**: Windows / MacOS / Linux
> - **And installed**: via installer / homebrew / binaries / from source
_Your issue description goes here below. Try to include **actual** vs. **expected behavior** and **steps to reproduce** the issue._
---

9
Cargo.lock generated
View File

@ -723,6 +723,7 @@ dependencies = [
"futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-cpupool 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
"hyper 0.10.5 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"native-contracts 0.1.0",
"parking_lot 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1580,7 +1581,7 @@ version = "0.1.0"
dependencies = [
"byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"ethabi 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"ethcore-util 1.8.0",
"ethcore-bigint 0.1.3",
"futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
"native-contract-generator 0.1.0",
]
@ -1786,7 +1787,7 @@ dependencies = [
[[package]]
name = "parity"
version = "1.7.0"
version = "1.8.0"
dependencies = [
"ansi_term 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
"app_dirs 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
@ -2052,7 +2053,7 @@ dependencies = [
[[package]]
name = "parity-ui-precompiled"
version = "1.4.0"
source = "git+https://github.com/paritytech/js-precompiled.git#dfb9367a495d5ca3eac3c92a4197cf8652756d37"
source = "git+https://github.com/paritytech/js-precompiled.git#dd9b92d9d8c244678e15163347f9adb2e2560959"
dependencies = [
"parity-dapps-glue 1.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -2208,10 +2209,10 @@ dependencies = [
name = "price-info"
version = "1.7.0"
dependencies = [
"ethcore-util 1.8.0",
"fetch 0.1.0",
"futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
]

View File

@ -1,7 +1,7 @@
[package]
description = "Parity Ethereum client"
name = "parity"
version = "1.7.0"
version = "1.8.0"
license = "GPL-3.0"
authors = ["Parity Technologies <admin@parity.io>"]
build = "build.rs"

View File

@ -1,55 +1,41 @@
# [Parity](https://parity.io/parity.html)
### Fast, light, and robust Ethereum implementation
# [Parity](https://parity.io/parity.html) - fast, light, and robust Ethereum client
### [Download latest release](https://github.com/paritytech/parity/releases)
[![build status](https://gitlab.parity.io/parity/parity/badges/master/build.svg)](https://gitlab.parity.io/parity/parity/commits/master)
[![Snap Status](https://build.snapcraft.io/badge/paritytech/parity.svg)](https://build.snapcraft.io/user/paritytech/parity)
[![GPLv3](https://img.shields.io/badge/license-GPL%20v3-green.svg)](https://www.gnu.org/licenses/gpl-3.0.en.html)
[![build status](https://gitlab.parity.io/parity/parity/badges/master/build.svg)](https://gitlab.parity.io/parity/parity/commits/master) [![Coverage Status][coveralls-image]][coveralls-url] [![GPLv3][license-image]][license-url] [![Snap Status](https://build.snapcraft.io/badge/paritytech/parity.svg)](https://build.snapcraft.io/user/paritytech/parity)
- [Download the latest release here.](https://github.com/paritytech/parity/releases)
### Join the chat!
Parity [![Join the chat at https://gitter.im/ethcore/parity][gitter-image]][gitter-url] and
parity.js [![Join the chat at https://gitter.im/ethcore/parity.js](https://badges.gitter.im/ethcore/parity.js.svg)](https://gitter.im/ethcore/parity.js?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
[Internal Documentation][doc-url]
Be sure to check out [our wiki][wiki-url] for more information.
[coveralls-image]: https://coveralls.io/repos/github/paritytech/parity/badge.svg?branch=master
[coveralls-url]: https://coveralls.io/github/paritytech/parity?branch=master
[gitter-image]: https://badges.gitter.im/Join%20Chat.svg
[gitter-url]: https://gitter.im/ethcore/parity?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge
[license-image]: https://img.shields.io/badge/license-GPL%20v3-green.svg
[license-url]: https://www.gnu.org/licenses/gpl-3.0.en.html
[doc-url]: https://paritytech.github.io/parity/ethcore/index.html
[wiki-url]: https://github.com/paritytech/parity/wiki
Get in touch with us on Gitter:
[![Gitter: Parity](https://img.shields.io/badge/gitter-parity-4AB495.svg)](https://gitter.im/paritytech/parity)
[![Gitter: Parity.js](https://img.shields.io/badge/gitter-parity.js-4AB495.svg)](https://gitter.im/paritytech/parity.js)
[![Gitter: Parity/Miners](https://img.shields.io/badge/gitter-parity/miners-4AB495.svg)](https://gitter.im/paritytech/parity/miners)
[![Gitter: Parity-PoA](https://img.shields.io/badge/gitter-parity--poa-4AB495.svg)](https://gitter.im/paritytech/parity-poa)
Be sure to check out [our wiki](https://github.com/paritytech/parity/wiki) and the [internal documentation](https://paritytech.github.io/parity/ethcore/index.html) for more information.
----
## About Parity
Parity's goal is to be the fastest, lightest, and most secure Ethereum client. We are developing Parity using the sophisticated and
cutting-edge Rust programming language. Parity is licensed under the GPLv3, and can be used for all your Ethereum needs.
Parity's goal is to be the fastest, lightest, and most secure Ethereum client. We are developing Parity using the sophisticated and cutting-edge Rust programming language. Parity is licensed under the GPLv3, and can be used for all your Ethereum needs.
Parity comes with a built-in wallet. To access [Parity Wallet](http://web3.site/) simply go to http://web3.site/ (if you don't have access to the internet, but still want to use the service, you can also use http://127.0.0.1:8180/). It includes various functionality allowing you to:
Parity comes with a built-in wallet. To access [Parity Wallet](http://web3.site/) simply go to http://web3.site/ (if you don't have access to the internet, but still want to use the service, you can also use http://127.0.0.1:8180/). It
includes various functionality allowing you to:
- create and manage your Ethereum accounts;
- manage your Ether and any Ethereum tokens;
- create and register your own tokens;
- and much more.
By default, Parity will also run a JSONRPC server on `127.0.0.1:8545`. This is fully configurable and supports a number
of RPC APIs.
By default, Parity will also run a JSONRPC server on `127.0.0.1:8545`. This is fully configurable and supports a number of RPC APIs.
If you run into an issue while using parity, feel free to file one in this repository
or hop on our [gitter chat room][gitter-url] to ask a question. We are glad to help!
If you run into an issue while using parity, feel free to file one in this repository or hop on our [gitter chat room](https://gitter.im/paritytech/parity) to ask a question. We are glad to help!
**For security-critical issues**, please refer to the security policy outlined in `SECURITY.MD`.
Parity's current release is 1.6. You can download it at https://github.com/paritytech/parity/releases or follow the instructions
below to build from source.
Parity's current release is 1.7. You can download it at https://github.com/paritytech/parity/releases or follow the instructions below to build from source.
----

View File

@ -21,7 +21,7 @@ use hyper::method::Method;
use hyper::status::StatusCode;
use api::{response, types};
use api::time::TimeChecker;
use api::time::{TimeChecker, MAX_DRIFT};
use apps::fetcher::Fetcher;
use handlers::{self, extract_url};
use endpoint::{Endpoint, Handler, EndpointPath};
@ -122,7 +122,6 @@ impl RestApiRouter {
// Check time
let time = {
const MAX_DRIFT: i64 = 500;
let (status, message, details) = match time {
Ok(Ok(diff)) if diff < MAX_DRIFT && diff > -MAX_DRIFT => {
(HealthStatus::Ok, "".into(), diff)

View File

@ -33,11 +33,13 @@
use std::io;
use std::{fmt, mem, time};
use std::sync::Arc;
use std::collections::VecDeque;
use std::sync::atomic::{self, AtomicUsize};
use std::sync::Arc;
use futures::{self, Future, BoxFuture};
use futures_cpupool::CpuPool;
use futures::future::{self, IntoFuture};
use futures_cpupool::{CpuPool, CpuFuture};
use ntp;
use time::{Duration, Timespec};
use util::RwLock;
@ -45,6 +47,8 @@ use util::RwLock;
/// Time checker error.
#[derive(Debug, Clone, PartialEq)]
pub enum Error {
/// No servers are currently available for a query.
NoServersAvailable,
/// There was an error when trying to reach the NTP server.
Ntp(String),
/// IO error when reading NTP response.
@ -56,6 +60,7 @@ impl fmt::Display for Error {
use self::Error::*;
match *self {
NoServersAvailable => write!(fmt, "No NTP servers available"),
Ntp(ref err) => write!(fmt, "NTP error: {}", err),
Io(ref err) => write!(fmt, "Connection Error: {}", err),
}
@ -72,58 +77,123 @@ impl From<ntp::errors::Error> for Error {
/// NTP time drift checker.
pub trait Ntp {
/// Returned Future.
type Future: IntoFuture<Item=Duration, Error=Error>;
/// Returns the current time drift.
fn drift(&self) -> BoxFuture<Duration, Error>;
fn drift(&self) -> Self::Future;
}
const SERVER_MAX_POLL_INTERVAL_SECS: u64 = 60;
#[derive(Debug)]
struct Server {
pub address: String,
next_call: RwLock<time::Instant>,
failures: AtomicUsize,
}
impl Server {
pub fn is_available(&self) -> bool {
*self.next_call.read() < time::Instant::now()
}
pub fn report_success(&self) {
self.failures.store(0, atomic::Ordering::SeqCst);
self.update_next_call(1)
}
pub fn report_failure(&self) {
let errors = self.failures.fetch_add(1, atomic::Ordering::SeqCst);
self.update_next_call(1 << errors)
}
fn update_next_call(&self, delay: usize) {
*self.next_call.write() = time::Instant::now() + time::Duration::from_secs(delay as u64 * SERVER_MAX_POLL_INTERVAL_SECS);
}
}
impl<T: AsRef<str>> From<T> for Server {
fn from(t: T) -> Self {
Server {
address: t.as_ref().to_owned(),
next_call: RwLock::new(time::Instant::now()),
failures: Default::default(),
}
}
}
/// NTP client using the SNTP algorithm for calculating drift.
#[derive(Clone)]
pub struct SimpleNtp {
address: Arc<String>,
addresses: Vec<Arc<Server>>,
pool: CpuPool,
}
impl fmt::Debug for SimpleNtp {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Ntp {{ address: {} }}", self.address)
f
.debug_struct("SimpleNtp")
.field("addresses", &self.addresses)
.finish()
}
}
impl SimpleNtp {
fn new(address: &str, pool: CpuPool) -> SimpleNtp {
fn new<T: AsRef<str>>(addresses: &[T], pool: CpuPool) -> SimpleNtp {
SimpleNtp {
address: Arc::new(address.to_owned()),
addresses: addresses.iter().map(Server::from).map(Arc::new).collect(),
pool: pool,
}
}
}
impl Ntp for SimpleNtp {
fn drift(&self) -> BoxFuture<Duration, Error> {
let address = self.address.clone();
if &*address == "none" {
return futures::future::err(Error::Ntp("NTP server is not provided.".into())).boxed();
}
type Future = future::Either<
CpuFuture<Duration, Error>,
future::FutureResult<Duration, Error>,
>;
self.pool.spawn_fn(move || {
let packet = ntp::request(&*address)?;
let dest_time = ::time::now_utc().to_timespec();
let orig_time = Timespec::from(packet.orig_time);
let recv_time = Timespec::from(packet.recv_time);
let transmit_time = Timespec::from(packet.transmit_time);
fn drift(&self) -> Self::Future {
use self::future::Either::{A, B};
let drift = ((recv_time - orig_time) + (transmit_time - dest_time)) / 2;
let server = self.addresses.iter().find(|server| server.is_available());
server.map(|server| {
let server = server.clone();
A(self.pool.spawn_fn(move || {
debug!(target: "dapps", "Fetching time from {}.", server.address);
Ok(drift)
}).boxed()
match ntp::request(&server.address) {
Ok(packet) => {
let dest_time = ::time::now_utc().to_timespec();
let orig_time = Timespec::from(packet.orig_time);
let recv_time = Timespec::from(packet.recv_time);
let transmit_time = Timespec::from(packet.transmit_time);
let drift = ((recv_time - orig_time) + (transmit_time - dest_time)) / 2;
server.report_success();
Ok(drift)
},
Err(err) => {
server.report_failure();
Err(err.into())
},
}
}))
}).unwrap_or_else(|| B(future::err(Error::NoServersAvailable)))
}
}
// NOTE In a positive scenario first results will be seen after:
// MAX_RESULTS * UPDATE_TIMEOUT_OK_SECS seconds.
const MAX_RESULTS: usize = 7;
const UPDATE_TIMEOUT_OK_SECS: u64 = 30;
const UPDATE_TIMEOUT_ERR_SECS: u64 = 2;
// MAX_RESULTS * UPDATE_TIMEOUT_INCOMPLETE_SECS seconds.
const MAX_RESULTS: usize = 4;
const UPDATE_TIMEOUT_OK_SECS: u64 = 6 * 60 * 60;
const UPDATE_TIMEOUT_WARN_SECS: u64 = 15 * 60;
const UPDATE_TIMEOUT_ERR_SECS: u64 = 60;
const UPDATE_TIMEOUT_INCOMPLETE_SECS: u64 = 10;
/// Maximal valid time drift.
pub const MAX_DRIFT: i64 = 500;
#[derive(Debug, Clone)]
/// A time checker.
@ -134,13 +204,13 @@ pub struct TimeChecker<N: Ntp = SimpleNtp> {
impl TimeChecker<SimpleNtp> {
/// Creates new time checker given the NTP server address.
pub fn new(ntp_address: String, pool: CpuPool) -> Self {
pub fn new<T: AsRef<str>>(ntp_addresses: &[T], pool: CpuPool) -> Self {
let last_result = Arc::new(RwLock::new(
// Assume everything is ok at the very beginning.
(time::Instant::now(), vec![Ok(0)].into())
));
let ntp = SimpleNtp::new(&ntp_address, pool);
let ntp = SimpleNtp::new(ntp_addresses, pool);
TimeChecker {
ntp,
@ -149,22 +219,34 @@ impl TimeChecker<SimpleNtp> {
}
}
impl<N: Ntp> TimeChecker<N> {
impl<N: Ntp> TimeChecker<N> where <N::Future as IntoFuture>::Future: Send + 'static {
/// Updates the time
pub fn update(&self) -> BoxFuture<i64, Error> {
trace!(target: "dapps", "Updating time from NTP.");
let last_result = self.last_result.clone();
self.ntp.drift().then(move |res| {
self.ntp.drift().into_future().then(move |res| {
let res = res.map(|d| d.num_milliseconds());
if let Err(Error::NoServersAvailable) = res {
debug!(target: "dapps", "No NTP servers available. Selecting an older result.");
return select_result(last_result.read().1.iter());
}
// Update the results.
let mut results = mem::replace(&mut last_result.write().1, VecDeque::new());
let has_all_results = results.len() >= MAX_RESULTS;
let valid_till = time::Instant::now() + time::Duration::from_secs(
if res.is_ok() && results.len() == MAX_RESULTS {
UPDATE_TIMEOUT_OK_SECS
} else {
UPDATE_TIMEOUT_ERR_SECS
match res {
Ok(time) if has_all_results && time < MAX_DRIFT => UPDATE_TIMEOUT_OK_SECS,
Ok(_) if has_all_results => UPDATE_TIMEOUT_WARN_SECS,
Err(_) if has_all_results => UPDATE_TIMEOUT_ERR_SECS,
_ => UPDATE_TIMEOUT_INCOMPLETE_SECS,
}
);
trace!(target: "dapps", "New time drift received: {:?}", res);
// Push the result.
results.push_back(res.map(|d| d.num_milliseconds()));
results.push_back(res);
while results.len() > MAX_RESULTS {
results.pop_front();
}
@ -209,7 +291,7 @@ mod tests {
use std::cell::{Cell, RefCell};
use std::time::Instant;
use time::Duration;
use futures::{self, BoxFuture, Future};
use futures::{future, Future};
use super::{Ntp, TimeChecker, Error};
use util::RwLock;
@ -224,9 +306,11 @@ mod tests {
}
impl Ntp for FakeNtp {
fn drift(&self) -> BoxFuture<Duration, Error> {
type Future = future::FutureResult<Duration, Error>;
fn drift(&self) -> Self::Future {
self.1.set(self.1.get() + 1);
futures::future::ok(self.0.borrow_mut().pop().expect("Unexpected call to drift().")).boxed()
future::ok(self.0.borrow_mut().pop().expect("Unexpected call to drift()."))
}
}

View File

@ -130,7 +130,7 @@ impl Middleware {
/// Creates new middleware for UI server.
pub fn ui<F: Fetch>(
ntp_server: &str,
ntp_servers: &[String],
pool: CpuPool,
remote: Remote,
dapps_domain: &str,
@ -146,7 +146,7 @@ impl Middleware {
).embeddable_on(None).allow_dapps(false));
let special = {
let mut special = special_endpoints(
ntp_server,
ntp_servers,
pool,
content_fetcher.clone(),
remote.clone(),
@ -171,7 +171,7 @@ impl Middleware {
/// Creates new Dapps server middleware.
pub fn dapps<F: Fetch>(
ntp_server: &str,
ntp_servers: &[String],
pool: CpuPool,
remote: Remote,
ui_address: Option<(String, u16)>,
@ -203,7 +203,7 @@ impl Middleware {
let special = {
let mut special = special_endpoints(
ntp_server,
ntp_servers,
pool,
content_fetcher.clone(),
remote.clone(),
@ -237,8 +237,8 @@ impl http::RequestMiddleware for Middleware {
}
}
fn special_endpoints(
ntp_server: &str,
fn special_endpoints<T: AsRef<str>>(
ntp_servers: &[T],
pool: CpuPool,
content_fetcher: Arc<apps::fetcher::Fetcher>,
remote: Remote,
@ -250,7 +250,7 @@ fn special_endpoints(
special.insert(router::SpecialEndpoint::Api, Some(api::RestApi::new(
content_fetcher,
sync_status,
api::TimeChecker::new(ntp_server.into(), pool),
api::TimeChecker::new(ntp_servers, pool),
remote,
)));
special

View File

@ -255,7 +255,7 @@ impl Server {
fetch: F,
) -> Result<Server, http::Error> {
let middleware = Middleware::dapps(
"pool.ntp.org:123",
&["0.pool.ntp.org:123".into(), "1.pool.ntp.org:123".into()],
CpuPool::new(4),
remote,
signer_address,

View File

@ -9,7 +9,7 @@ build = "build.rs"
ethabi = "2.0"
futures = "0.1"
byteorder = "1.0"
ethcore-util = { path = "../../util" }
ethcore-bigint = { path = "../../util/bigint" }
[build-dependencies]
native-contract-generator = { path = "generator" }

View File

@ -21,6 +21,7 @@ use std::fs::File;
use std::io::Write;
// TODO: just walk the "res" directory and generate whole crate automatically.
const KEY_SERVER_SET_ABI: &'static str = include_str!("res/key_server_set.json");
const REGISTRY_ABI: &'static str = include_str!("res/registrar.json");
const URLHINT_ABI: &'static str = include_str!("res/urlhint.json");
const SERVICE_TRANSACTION_ABI: &'static str = include_str!("res/service_transaction.json");
@ -45,6 +46,7 @@ fn build_test_contracts() {
}
fn main() {
build_file("KeyServerSet", KEY_SERVER_SET_ABI, "key_server_set.rs");
build_file("Registry", REGISTRY_ABI, "registry.rs");
build_file("Urlhint", URLHINT_ABI, "urlhint.rs");
build_file("ServiceTransactionChecker", SERVICE_TRANSACTION_ABI, "service_transaction.rs");

View File

@ -15,7 +15,7 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Rust code contract generator.
//! The code generated will require a dependence on the `ethcore-util`,
//! The code generated will require a dependence on the `ethcore-bigint::prelude`,
//! `ethabi`, `byteorder`, and `futures` crates.
//! This currently isn't hygienic, so compilation of generated code may fail
//! due to missing crates or name collisions. This will change when
@ -48,14 +48,14 @@ pub fn generate_module(struct_name: &str, abi: &str) -> Result<String, Error> {
use byteorder::{{BigEndian, ByteOrder}};
use futures::{{future, Future, IntoFuture, BoxFuture}};
use ethabi::{{Contract, Interface, Token, Event}};
use util;
use bigint;
/// Generated Rust bindings to an Ethereum contract.
#[derive(Clone, Debug)]
pub struct {name} {{
contract: Contract,
/// Address to make calls to.
pub address: util::Address,
pub address: bigint::prelude::H160,
}}
const ABI: &'static str = r#"{abi_str}"#;
@ -63,7 +63,7 @@ const ABI: &'static str = r#"{abi_str}"#;
impl {name} {{
/// Create a new instance of `{name}` with an address.
/// Calls can be made, given a callback for dispatching calls asynchronously.
pub fn new(address: util::Address) -> Self {{
pub fn new(address: bigint::prelude::H160) -> Self {{
let contract = Contract::new(Interface::load(ABI.as_bytes())
.expect("ABI checked at generation-time; qed"));
{name} {{
@ -108,7 +108,7 @@ fn generate_functions(contract: &Contract) -> Result<String, Error> {
/// Outputs: {abi_outputs:?}
pub fn {snake_name}<F, U>(&self, call: F, {params}) -> BoxFuture<{output_type}, String>
where
F: FnOnce(util::Address, Vec<u8>) -> U,
F: FnOnce(bigint::prelude::H160, Vec<u8>) -> U,
U: IntoFuture<Item=Vec<u8>, Error=String>,
U::Future: Send + 'static
{{
@ -217,8 +217,8 @@ fn output_params_codegen(outputs: &[ParamType]) -> Result<(String, String), Para
// create code for an argument type from param type.
fn rust_type(input: ParamType) -> Result<String, ParamType> {
Ok(match input {
ParamType::Address => "util::Address".into(),
ParamType::FixedBytes(len) if len <= 32 => format!("util::H{}", len * 8),
ParamType::Address => "bigint::prelude::H160".into(),
ParamType::FixedBytes(len) if len <= 32 => format!("bigint::prelude::H{}", len * 8),
ParamType::Bytes | ParamType::FixedBytes(_) => "Vec<u8>".into(),
ParamType::Int(width) => match width {
8 | 16 | 32 | 64 => format!("i{}", width),
@ -226,7 +226,7 @@ fn rust_type(input: ParamType) -> Result<String, ParamType> {
},
ParamType::Uint(width) => match width {
8 | 16 | 32 | 64 => format!("u{}", width),
128 | 160 | 256 => format!("util::U{}", width),
128 | 160 | 256 => format!("bigint::prelude::U{}", width),
_ => return Err(ParamType::Uint(width)),
},
ParamType::Bool => "bool".into(),
@ -259,8 +259,8 @@ fn tokenize(name: &str, input: ParamType) -> (bool, String) {
},
ParamType::Uint(width) => format!(
"let mut r = [0; 32]; {}.to_big_endian(&mut r); Token::Uint(r)",
if width <= 64 { format!("util::U256::from({} as u64)", name) }
else { format!("util::U256::from({})", name) }
if width <= 64 { format!("bigint::prelude::U256::from({} as u64)", name) }
else { format!("bigint::prelude::U256::from({})", name) }
),
ParamType::Bool => format!("Token::Bool({})", name),
ParamType::String => format!("Token::String({})", name),
@ -281,11 +281,11 @@ fn tokenize(name: &str, input: ParamType) -> (bool, String) {
// panics on unsupported types.
fn detokenize(name: &str, output_type: ParamType) -> String {
match output_type {
ParamType::Address => format!("{}.to_address().map(util::H160)", name),
ParamType::Address => format!("{}.to_address().map(bigint::prelude::H160)", name),
ParamType::Bytes => format!("{}.to_bytes()", name),
ParamType::FixedBytes(len) if len <= 32 => {
// ensure no panic on slice too small.
let read_hash = format!("b.resize({}, 0); util::H{}::from_slice(&b[..{}])",
let read_hash = format!("b.resize({}, 0); bigint::prelude::H{}::from_slice(&b[..{}])",
len, len * 8, len);
format!("{}.to_fixed_bytes().map(|mut b| {{ {} }})",
@ -302,8 +302,8 @@ fn detokenize(name: &str, output_type: ParamType) -> String {
}
ParamType::Uint(width) => {
let read_uint = match width {
8 | 16 | 32 | 64 => format!("util::U256(u).low_u64() as u{}", width),
_ => format!("util::U{}::from(&u[..])", width),
8 | 16 | 32 | 64 => format!("bigint::prelude::U256(u).low_u64() as u{}", width),
_ => format!("bigint::prelude::U{}::from(&u[..])", width),
};
format!("{}.to_uint().map(|u| {})", name, read_uint)
@ -328,30 +328,30 @@ mod tests {
#[test]
fn input_types() {
assert_eq!(::input_params_codegen(&[]).unwrap().0, "");
assert_eq!(::input_params_codegen(&[ParamType::Address]).unwrap().0, "param_0: util::Address, ");
assert_eq!(::input_params_codegen(&[ParamType::Address]).unwrap().0, "param_0: bigint::prelude::H160, ");
assert_eq!(::input_params_codegen(&[ParamType::Address, ParamType::Bytes]).unwrap().0,
"param_0: util::Address, param_1: Vec<u8>, ");
"param_0: bigint::prelude::H160, param_1: Vec<u8>, ");
}
#[test]
fn output_types() {
assert_eq!(::output_params_codegen(&[]).unwrap().0, "()");
assert_eq!(::output_params_codegen(&[ParamType::Address]).unwrap().0, "(util::Address)");
assert_eq!(::output_params_codegen(&[ParamType::Address]).unwrap().0, "(bigint::prelude::H160)");
assert_eq!(::output_params_codegen(&[ParamType::Address, ParamType::Array(Box::new(ParamType::Bytes))]).unwrap().0,
"(util::Address, Vec<Vec<u8>>)");
"(bigint::prelude::H160, Vec<Vec<u8>>)");
}
#[test]
fn rust_type() {
assert_eq!(::rust_type(ParamType::FixedBytes(32)).unwrap(), "util::H256");
assert_eq!(::rust_type(ParamType::FixedBytes(32)).unwrap(), "bigint::prelude::H256");
assert_eq!(::rust_type(ParamType::Array(Box::new(ParamType::FixedBytes(32)))).unwrap(),
"Vec<util::H256>");
"Vec<bigint::prelude::H256>");
assert_eq!(::rust_type(ParamType::Uint(64)).unwrap(), "u64");
assert!(::rust_type(ParamType::Uint(63)).is_err());
assert_eq!(::rust_type(ParamType::Int(32)).unwrap(), "i32");
assert_eq!(::rust_type(ParamType::Uint(256)).unwrap(), "util::U256");
assert_eq!(::rust_type(ParamType::Uint(256)).unwrap(), "bigint::prelude::U256");
}
// codegen tests will need bootstrapping of some kind.

View File

@ -0,0 +1 @@
[{"constant":true,"inputs":[{"name":"","type":"uint256"}],"name":"keyServersList","outputs":[{"name":"","type":"address"}],"payable":false,"type":"function"},{"constant":false,"inputs":[{"name":"_new","type":"address"}],"name":"setOwner","outputs":[],"payable":false,"type":"function"},{"constant":true,"inputs":[{"name":"keyServer","type":"address"}],"name":"getKeyServerPublic","outputs":[{"name":"","type":"bytes"}],"payable":false,"type":"function"},{"constant":true,"inputs":[],"name":"getKeyServers","outputs":[{"name":"","type":"address[]"}],"payable":false,"type":"function"},{"constant":true,"inputs":[],"name":"owner","outputs":[{"name":"","type":"address"}],"payable":false,"type":"function"},{"constant":true,"inputs":[{"name":"keyServer","type":"address"}],"name":"getKeyServerAddress","outputs":[{"name":"","type":"string"}],"payable":false,"type":"function"},{"constant":false,"inputs":[{"name":"keyServer","type":"address"}],"name":"removeKeyServer","outputs":[],"payable":false,"type":"function"},{"constant":false,"inputs":[{"name":"keyServerPublic","type":"bytes"},{"name":"keyServerIp","type":"string"}],"name":"addKeyServer","outputs":[],"payable":false,"type":"function"},{"anonymous":false,"inputs":[{"indexed":false,"name":"keyServer","type":"address"}],"name":"KeyServerAdded","type":"event"},{"anonymous":false,"inputs":[{"indexed":false,"name":"keyServer","type":"address"}],"name":"KeyServerRemoved","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"old","type":"address"},{"indexed":true,"name":"current","type":"address"}],"name":"NewOwner","type":"event"}]

View File

@ -0,0 +1,21 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
#![allow(unused_mut, unused_variables, unused_imports)]
//! Secret store Key Server set contract.
include!(concat!(env!("OUT_DIR"), "/key_server_set.rs"));

View File

@ -21,8 +21,9 @@
extern crate futures;
extern crate byteorder;
extern crate ethabi;
extern crate ethcore_util as util;
extern crate ethcore_bigint as bigint;
mod key_server_set;
mod registry;
mod urlhint;
mod service_transaction;
@ -32,6 +33,7 @@ mod validator_report;
pub mod test_contracts;
pub use self::key_server_set::KeyServerSet;
pub use self::registry::Registry;
pub use self::urlhint::Urlhint;
pub use self::service_transaction::ServiceTransactionChecker;

View File

@ -690,6 +690,7 @@ impl Engine for AuthorityRound {
// apply immediate transitions.
if let Some(change) = self.validators.is_epoch_end(first, chain_head) {
let change = combine_proofs(chain_head.number(), &change, &[]);
return Some(change)
}

View File

@ -643,6 +643,7 @@ impl Engine for Tendermint {
let first = chain_head.number() == 0;
if let Some(change) = self.validators.is_epoch_end(first, chain_head) {
let change = combine_proofs(chain_head.number(), &change, &[]);
return Some(change)
} else if let Some(pending) = transition_store(chain_head.hash()) {
let signal_number = chain_head.number();

2
js/package-lock.json generated
View File

@ -1,6 +1,6 @@
{
"name": "parity.js",
"version": "1.8.10",
"version": "1.8.11",
"lockfileVersion": 1,
"requires": true,
"dependencies": {

View File

@ -1,6 +1,6 @@
{
"name": "parity.js",
"version": "1.8.10",
"version": "1.8.11",
"main": "release/index.js",
"jsnext:main": "src/index.js",
"author": "Parity Team <admin@parity.io>",

View File

@ -462,7 +462,7 @@
<key>OVERWRITE_PERMISSIONS</key>
<false/>
<key>VERSION</key>
<string>1.7.0</string>
<string>1.8.0</string>
</dict>
<key>UUID</key>
<string>2DCD5B81-7BAF-4DA1-9251-6274B089FD36</string>

View File

@ -17,7 +17,7 @@
<key>CFBundlePackageType</key>
<string>APPL</string>
<key>CFBundleShortVersionString</key>
<string>1.6</string>
<string>1.8</string>
<key>CFBundleVersion</key>
<string>1</string>
<key>LSApplicationCategoryType</key>

View File

@ -9,7 +9,7 @@
!define COMPANYNAME "Parity"
!define DESCRIPTION "Fast, light, robust Ethereum implementation"
!define VERSIONMAJOR 1
!define VERSIONMINOR 7
!define VERSIONMINOR 8
!define VERSIONBUILD 0
!define ARGS "--warp"
!define FIRST_START_ARGS "ui --warp --mode=passive"

View File

@ -78,7 +78,7 @@ disable_periodic = true
jit = false
[misc]
ntp_server = "pool.ntp.org:123"
ntp_servers = ["0.parity.pool.ntp.org:123"]
logging = "own_tx=trace"
log_file = "/var/log/parity.log"
color = true

View File

@ -359,8 +359,8 @@ usage! {
or |c: &Config| otry!(c.vm).jit.clone(),
// -- Miscellaneous Options
flag_ntp_server: String = "none",
or |c: &Config| otry!(c.misc).ntp_server.clone(),
flag_ntp_servers: String = "0.parity.pool.ntp.org:123,1.parity.pool.ntp.org:123,2.parity.pool.ntp.org:123,3.parity.pool.ntp.org:123",
or |c: &Config| otry!(c.misc).ntp_servers.clone().map(|vec| vec.join(",")),
flag_logging: Option<String> = None,
or |c: &Config| otry!(c.misc).logging.clone().map(Some),
flag_log_file: Option<String> = None,
@ -606,7 +606,7 @@ struct VM {
#[derive(Default, Debug, PartialEq, Deserialize)]
struct Misc {
ntp_server: Option<String>,
ntp_servers: Option<Vec<String>>,
logging: Option<String>,
log_file: Option<String>,
color: Option<bool>,
@ -919,7 +919,7 @@ mod tests {
flag_dapps_apis_all: None,
// -- Miscellaneous Options
flag_ntp_server: "none".into(),
flag_ntp_servers: "0.parity.pool.ntp.org:123,1.parity.pool.ntp.org:123,2.parity.pool.ntp.org:123,3.parity.pool.ntp.org:123".into(),
flag_version: false,
flag_logging: Some("own_tx=trace".into()),
flag_log_file: Some("/var/log/parity.log".into()),
@ -1098,7 +1098,7 @@ mod tests {
jit: Some(false),
}),
misc: Some(Misc {
ntp_server: Some("pool.ntp.org:123".into()),
ntp_servers: Some(vec!["0.parity.pool.ntp.org:123".into()]),
logging: Some("own_tx=trace".into()),
log_file: Some("/var/log/parity.log".into()),
color: Some(true),

View File

@ -483,8 +483,10 @@ Internal Options:
--can-restart Executable will auto-restart if exiting with 69.
Miscellaneous Options:
--ntp-server HOST NTP server to provide current time (host:port). Used to verify node health.
(default: {flag_ntp_server})
--ntp-servers HOSTS Comma separated list of NTP servers to provide current time (host:port).
Used to verify node health. Parity uses pool.ntp.org NTP servers,
consider joining the pool: http://www.pool.ntp.org/join.html
(default: {flag_ntp_servers})
-l --logging LOGGING Specify the logging level. Must conform to the same
format as RUST_LOG. (default: {flag_logging:?})
--log-file FILENAME Specify a filename into which logging should be

View File

@ -551,10 +551,14 @@ impl Configuration {
Ok(options)
}
fn ntp_servers(&self) -> Vec<String> {
self.args.flag_ntp_servers.split(",").map(str::to_owned).collect()
}
fn ui_config(&self) -> UiConfiguration {
UiConfiguration {
enabled: self.ui_enabled(),
ntp_server: self.args.flag_ntp_server.clone(),
ntp_servers: self.ntp_servers(),
interface: self.ui_interface(),
port: self.args.flag_ports_shift + self.args.flag_ui_port,
hosts: self.ui_hosts(),
@ -564,7 +568,7 @@ impl Configuration {
fn dapps_config(&self) -> DappsConfiguration {
DappsConfiguration {
enabled: self.dapps_enabled(),
ntp_server: self.args.flag_ntp_server.clone(),
ntp_servers: self.ntp_servers(),
dapps_path: PathBuf::from(self.directories().dapps),
extra_dapps: if self.args.cmd_dapp {
self.args.arg_path.iter().map(|path| PathBuf::from(path)).collect()
@ -1278,7 +1282,12 @@ mod tests {
support_token_api: true
}, UiConfiguration {
enabled: true,
ntp_server: "none".into(),
ntp_servers: vec![
"0.parity.pool.ntp.org:123".into(),
"1.parity.pool.ntp.org:123".into(),
"2.parity.pool.ntp.org:123".into(),
"3.parity.pool.ntp.org:123".into(),
],
interface: "127.0.0.1".into(),
port: 8180,
hosts: Some(vec![]),
@ -1521,10 +1530,16 @@ mod tests {
let conf3 = parse(&["parity", "--ui-path", "signer", "--ui-interface", "test"]);
// then
let ntp_servers = vec![
"0.parity.pool.ntp.org:123".into(),
"1.parity.pool.ntp.org:123".into(),
"2.parity.pool.ntp.org:123".into(),
"3.parity.pool.ntp.org:123".into(),
];
assert_eq!(conf0.directories().signer, "signer".to_owned());
assert_eq!(conf0.ui_config(), UiConfiguration {
enabled: true,
ntp_server: "none".into(),
ntp_servers: ntp_servers.clone(),
interface: "127.0.0.1".into(),
port: 8180,
hosts: Some(vec![]),
@ -1533,7 +1548,7 @@ mod tests {
assert_eq!(conf1.directories().signer, "signer".to_owned());
assert_eq!(conf1.ui_config(), UiConfiguration {
enabled: true,
ntp_server: "none".into(),
ntp_servers: ntp_servers.clone(),
interface: "127.0.0.1".into(),
port: 8180,
hosts: Some(vec![]),
@ -1543,7 +1558,7 @@ mod tests {
assert_eq!(conf2.directories().signer, "signer".to_owned());
assert_eq!(conf2.ui_config(), UiConfiguration {
enabled: true,
ntp_server: "none".into(),
ntp_servers: ntp_servers.clone(),
interface: "127.0.0.1".into(),
port: 3123,
hosts: Some(vec![]),
@ -1552,7 +1567,7 @@ mod tests {
assert_eq!(conf3.directories().signer, "signer".to_owned());
assert_eq!(conf3.ui_config(), UiConfiguration {
enabled: true,
ntp_server: "none".into(),
ntp_servers: ntp_servers.clone(),
interface: "test".into(),
port: 8180,
hosts: Some(vec![]),

View File

@ -36,7 +36,7 @@ use util::{Bytes, Address};
#[derive(Debug, PartialEq, Clone)]
pub struct Configuration {
pub enabled: bool,
pub ntp_server: String,
pub ntp_servers: Vec<String>,
pub dapps_path: PathBuf,
pub extra_dapps: Vec<PathBuf>,
pub extra_embed_on: Vec<(String, u16)>,
@ -47,7 +47,12 @@ impl Default for Configuration {
let data_dir = default_data_path();
Configuration {
enabled: true,
ntp_server: "none".into(),
ntp_servers: vec![
"0.parity.pool.ntp.org:123".into(),
"1.parity.pool.ntp.org:123".into(),
"2.parity.pool.ntp.org:123".into(),
"3.parity.pool.ntp.org:123".into(),
],
dapps_path: replace_home(&data_dir, "$BASE/dapps").into(),
extra_dapps: vec![],
extra_embed_on: vec![],
@ -158,7 +163,7 @@ pub fn new(configuration: Configuration, deps: Dependencies) -> Result<Option<Mi
server::dapps_middleware(
deps,
&configuration.ntp_server,
&configuration.ntp_servers,
configuration.dapps_path,
configuration.extra_dapps,
rpc::DAPPS_DOMAIN,
@ -166,14 +171,14 @@ pub fn new(configuration: Configuration, deps: Dependencies) -> Result<Option<Mi
).map(Some)
}
pub fn new_ui(enabled: bool, ntp_server: &str, deps: Dependencies) -> Result<Option<Middleware>, String> {
pub fn new_ui(enabled: bool, ntp_servers: &[String], deps: Dependencies) -> Result<Option<Middleware>, String> {
if !enabled {
return Ok(None);
}
server::ui_middleware(
deps,
ntp_server,
ntp_servers,
rpc::DAPPS_DOMAIN,
).map(Some)
}
@ -204,7 +209,7 @@ mod server {
pub fn dapps_middleware(
_deps: Dependencies,
_ntp_server: &str,
_ntp_servers: &[String],
_dapps_path: PathBuf,
_extra_dapps: Vec<PathBuf>,
_dapps_domain: &str,
@ -215,7 +220,7 @@ mod server {
pub fn ui_middleware(
_deps: Dependencies,
_ntp_server: &str,
_ntp_servers: &[String],
_dapps_domain: &str,
) -> Result<Middleware, String> {
Err("Your Parity version has been compiled without UI support.".into())
@ -241,7 +246,7 @@ mod server {
pub fn dapps_middleware(
deps: Dependencies,
ntp_server: &str,
ntp_servers: &[String],
dapps_path: PathBuf,
extra_dapps: Vec<PathBuf>,
dapps_domain: &str,
@ -252,7 +257,7 @@ mod server {
let web_proxy_tokens = Arc::new(move |token| signer.web_proxy_access_token_domain(&token));
Ok(parity_dapps::Middleware::dapps(
ntp_server,
ntp_servers,
deps.pool,
parity_remote,
deps.ui_address,
@ -269,12 +274,12 @@ mod server {
pub fn ui_middleware(
deps: Dependencies,
ntp_server: &str,
ntp_servers: &[String],
dapps_domain: &str,
) -> Result<Middleware, String> {
let parity_remote = parity_reactor::Remote::new(deps.remote.clone());
Ok(parity_dapps::Middleware::ui(
ntp_server,
ntp_servers,
deps.pool,
parity_remote,
dapps_domain,

View File

@ -73,7 +73,7 @@ impl Default for HttpConfiguration {
#[derive(Debug, PartialEq, Clone)]
pub struct UiConfiguration {
pub enabled: bool,
pub ntp_server: String,
pub ntp_servers: Vec<String>,
pub interface: String,
pub port: u16,
pub hosts: Option<Vec<String>>,
@ -107,7 +107,12 @@ impl Default for UiConfiguration {
fn default() -> Self {
UiConfiguration {
enabled: true && cfg!(feature = "ui-enabled"),
ntp_server: "none".into(),
ntp_servers: vec![
"0.parity.pool.ntp.org:123".into(),
"1.parity.pool.ntp.org:123".into(),
"2.parity.pool.ntp.org:123".into(),
"3.parity.pool.ntp.org:123".into(),
],
port: 8180,
interface: "127.0.0.1".into(),
hosts: Some(vec![]),

View File

@ -311,7 +311,7 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) ->
};
let dapps_middleware = dapps::new(cmd.dapps_conf.clone(), dapps_deps.clone())?;
let ui_middleware = dapps::new_ui(cmd.ui_conf.enabled, &cmd.ui_conf.ntp_server, dapps_deps)?;
let ui_middleware = dapps::new_ui(cmd.ui_conf.enabled, &cmd.ui_conf.ntp_servers, dapps_deps)?;
// start RPCs
let dapps_service = dapps::service(&dapps_middleware);
@ -687,7 +687,7 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> R
}
};
let dapps_middleware = dapps::new(cmd.dapps_conf.clone(), dapps_deps.clone())?;
let ui_middleware = dapps::new_ui(cmd.ui_conf.enabled, &cmd.ui_conf.ntp_server, dapps_deps)?;
let ui_middleware = dapps::new_ui(cmd.ui_conf.enabled, &cmd.ui_conf.ntp_servers, dapps_deps)?;
let dapps_service = dapps::service(&dapps_middleware);
let deps_for_rpc_apis = Arc::new(rpc_apis::FullDependencies {

View File

@ -13,4 +13,4 @@ log = "0.3"
serde_json = "1.0"
[dev-dependencies]
ethcore-util = { path = "../util" }
parking_lot = "0.4"

View File

@ -26,6 +26,207 @@ extern crate log;
pub extern crate fetch;
mod price_info;
use std::cmp;
use std::fmt;
use std::io;
use std::io::Read;
pub use price_info::*;
use fetch::{Client as FetchClient, Fetch};
use futures::Future;
use serde_json::Value;
/// Current ETH price information.
#[derive(Debug)]
pub struct PriceInfo {
/// Current ETH price in USD.
pub ethusd: f32,
}
/// Price info error.
#[derive(Debug)]
pub enum Error {
/// The API returned an unexpected status code.
StatusCode(&'static str),
/// The API returned an unexpected status content.
UnexpectedResponse(String),
/// There was an error when trying to reach the API.
Fetch(fetch::Error),
/// IO error when reading API response.
Io(io::Error),
}
impl From<io::Error> for Error {
fn from(err: io::Error) -> Self { Error::Io(err) }
}
impl From<fetch::Error> for Error {
fn from(err: fetch::Error) -> Self { Error::Fetch(err) }
}
/// A client to get the current ETH price using an external API.
pub struct Client<F = FetchClient> {
api_endpoint: String,
fetch: F,
}
impl<F> fmt::Debug for Client<F> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("price_info::Client")
.field("api_endpoint", &self.api_endpoint)
.finish()
}
}
impl<F> cmp::PartialEq for Client<F> {
fn eq(&self, other: &Client<F>) -> bool {
self.api_endpoint == other.api_endpoint
}
}
impl<F: Fetch> Client<F> {
/// Creates a new instance of the `Client` given a `fetch::Client`.
pub fn new(fetch: F) -> Client<F> {
let api_endpoint = "http://api.etherscan.io/api?module=stats&action=ethprice".to_owned();
Client { api_endpoint, fetch }
}
/// Gets the current ETH price and calls `set_price` with the result.
pub fn get<G: Fn(PriceInfo) + Sync + Send + 'static>(&self, set_price: G) {
self.fetch.forget(self.fetch.fetch(&self.api_endpoint)
.map_err(|err| Error::Fetch(err))
.and_then(move |mut response| {
if !response.is_success() {
return Err(Error::StatusCode(response.status().canonical_reason().unwrap_or("unknown")));
}
let mut result = String::new();
response.read_to_string(&mut result)?;
let value: Option<Value> = serde_json::from_str(&result).ok();
let ethusd = value
.as_ref()
.and_then(|value| value.pointer("/result/ethusd"))
.and_then(|obj| obj.as_str())
.and_then(|s| s.parse().ok());
match ethusd {
Some(ethusd) => {
set_price(PriceInfo { ethusd });
Ok(())
},
None => Err(Error::UnexpectedResponse(result)),
}
})
.map_err(|err| {
warn!("Failed to auto-update latest ETH price: {:?}", err);
err
})
);
}
}
#[cfg(test)]
mod test {
extern crate parking_lot;
use self::parking_lot::Mutex;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use fetch;
use fetch::Fetch;
use futures;
use futures::future::{Future, FutureResult};
use Client;
#[derive(Clone)]
struct FakeFetch(Option<String>, Arc<Mutex<u64>>);
impl Fetch for FakeFetch {
type Result = FutureResult<fetch::Response, fetch::Error>;
fn new() -> Result<Self, fetch::Error> where Self: Sized { Ok(FakeFetch(None, Default::default())) }
fn fetch_with_abort(&self, url: &str, _abort: fetch::Abort) -> Self::Result {
assert_eq!(url, "http://api.etherscan.io/api?module=stats&action=ethprice");
let mut val = self.1.lock();
*val = *val + 1;
if let Some(ref response) = self.0 {
let data = ::std::io::Cursor::new(response.clone());
futures::future::ok(fetch::Response::from_reader(data))
} else {
futures::future::ok(fetch::Response::not_found())
}
}
// this guarantees that the calls to price_info::Client::get will block for execution
fn forget<F, I, E>(&self, f: F) where
F: Future<Item=I, Error=E> + Send + 'static,
I: Send + 'static,
E: Send + 'static {
let _ = f.wait();
}
}
fn price_info_ok(response: &str) -> Client<FakeFetch> {
Client::new(FakeFetch(Some(response.to_owned()), Default::default()))
}
fn price_info_not_found() -> Client<FakeFetch> {
Client::new(FakeFetch::new().unwrap())
}
#[test]
fn should_get_price_info() {
// given
let response = r#"{
"status": "1",
"message": "OK",
"result": {
"ethbtc": "0.0891",
"ethbtc_timestamp": "1499894236",
"ethusd": "209.55",
"ethusd_timestamp": "1499894229"
}
}"#;
let price_info = price_info_ok(response);
// when
price_info.get(|price| {
// then
assert_eq!(price.ethusd, 209.55);
});
}
#[test]
fn should_not_call_set_price_if_response_is_malformed() {
// given
let response = "{}";
let price_info = price_info_ok(response);
let b = Arc::new(AtomicBool::new(false));
// when
let bb = b.clone();
price_info.get(move |_| {
bb.store(true, Ordering::Relaxed);
});
// then
assert_eq!(b.load(Ordering::Relaxed), false);
}
#[test]
fn should_not_call_set_price_if_response_is_invalid() {
// given
let price_info = price_info_not_found();
let b = Arc::new(AtomicBool::new(false));
// when
let bb = b.clone();
price_info.get(move |_| {
bb.store(true, Ordering::Relaxed);
});
// then
assert_eq!(b.load(Ordering::Relaxed), false);
}
}

View File

@ -1,222 +0,0 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::cmp;
use std::fmt;
use std::io;
use std::io::Read;
use std::str::FromStr;
use fetch;
use fetch::{Client as FetchClient, Fetch};
use futures::Future;
use serde_json;
use serde_json::Value;
/// Current ETH price information.
#[derive(Debug)]
pub struct PriceInfo {
/// Current ETH price in USD.
pub ethusd: f32,
}
/// Price info error.
#[derive(Debug)]
pub enum Error {
/// The API returned an unexpected status code or content.
UnexpectedResponse(&'static str, String),
/// There was an error when trying to reach the API.
Fetch(fetch::Error),
/// IO error when reading API response.
Io(io::Error),
}
impl From<io::Error> for Error {
fn from(err: io::Error) -> Self { Error::Io(err) }
}
impl From<fetch::Error> for Error {
fn from(err: fetch::Error) -> Self { Error::Fetch(err) }
}
/// A client to get the current ETH price using an external API.
pub struct Client<F = FetchClient> {
api_endpoint: String,
fetch: F,
}
impl<F> fmt::Debug for Client<F> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("price_info::Client")
.field("api_endpoint", &self.api_endpoint)
.finish()
}
}
impl<F> cmp::PartialEq for Client<F> {
fn eq(&self, other: &Client<F>) -> bool {
self.api_endpoint == other.api_endpoint
}
}
impl<F: Fetch> Client<F> {
/// Creates a new instance of the `Client` given a `fetch::Client`.
pub fn new(fetch: F) -> Client<F> {
let api_endpoint = "http://api.etherscan.io/api?module=stats&action=ethprice".to_owned();
Client { api_endpoint, fetch }
}
/// Gets the current ETH price and calls `set_price` with the result.
pub fn get<G: Fn(PriceInfo) + Sync + Send + 'static>(&self, set_price: G) {
self.fetch.forget(self.fetch.fetch(&self.api_endpoint)
.map_err(|err| Error::Fetch(err))
.and_then(move |mut response| {
let mut result = String::new();
response.read_to_string(&mut result)?;
if response.is_success() {
let value: Result<Value, _> = serde_json::from_str(&result);
if let Ok(v) = value {
let obj = v.pointer("/result/ethusd").and_then(|obj| {
match *obj {
Value::String(ref s) => FromStr::from_str(s).ok(),
_ => None,
}
});
if let Some(ethusd) = obj {
set_price(PriceInfo { ethusd });
return Ok(());
}
}
}
let status = response.status().canonical_reason().unwrap_or("unknown");
Err(Error::UnexpectedResponse(status, result))
})
.map_err(|err| {
warn!("Failed to auto-update latest ETH price: {:?}", err);
err
})
);
}
}
#[cfg(test)]
mod test {
extern crate ethcore_util as util;
use self::util::Mutex;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use fetch;
use fetch::Fetch;
use futures;
use futures::future::{Future, FutureResult};
use price_info::Client;
#[derive(Clone)]
struct FakeFetch(Option<String>, Arc<Mutex<u64>>);
impl Fetch for FakeFetch {
type Result = FutureResult<fetch::Response, fetch::Error>;
fn new() -> Result<Self, fetch::Error> where Self: Sized { Ok(FakeFetch(None, Default::default())) }
fn fetch_with_abort(&self, url: &str, _abort: fetch::Abort) -> Self::Result {
assert_eq!(url, "http://api.etherscan.io/api?module=stats&action=ethprice");
let mut val = self.1.lock();
*val = *val + 1;
if let Some(ref response) = self.0 {
let data = ::std::io::Cursor::new(response.clone());
futures::future::ok(fetch::Response::from_reader(data))
} else {
futures::future::ok(fetch::Response::not_found())
}
}
// this guarantees that the calls to price_info::Client::get will block for execution
fn forget<F, I, E>(&self, f: F) where
F: Future<Item=I, Error=E> + Send + 'static,
I: Send + 'static,
E: Send + 'static {
let _ = f.wait();
}
}
fn price_info_ok(response: &str) -> Client<FakeFetch> {
Client::new(FakeFetch(Some(response.to_owned()), Default::default()))
}
fn price_info_not_found() -> Client<FakeFetch> {
Client::new(FakeFetch::new().unwrap())
}
#[test]
fn should_get_price_info() {
// given
let response = r#"{
"status": "1",
"message": "OK",
"result": {
"ethbtc": "0.0891",
"ethbtc_timestamp": "1499894236",
"ethusd": "209.55",
"ethusd_timestamp": "1499894229"
}
}"#;
let price_info = price_info_ok(response);
// when
price_info.get(|price| {
// then
assert_eq!(price.ethusd, 209.55);
});
}
#[test]
fn should_not_call_set_price_if_response_is_malformed() {
// given
let response = "{}";
let price_info = price_info_ok(response);
let b = Arc::new(AtomicBool::new(false));
// when
let bb = b.clone();
price_info.get(move |_| {
bb.store(true, Ordering::Relaxed);
});
// then
assert_eq!(b.load(Ordering::Relaxed), false);
}
#[test]
fn should_not_call_set_price_if_response_is_invalid() {
// given
let price_info = price_info_not_found();
let b = Arc::new(AtomicBool::new(false));
// when
let bb = b.clone();
price_info.get(move |_| {
bb.store(true, Ordering::Relaxed);
});
// then
assert_eq!(b.load(Ordering::Relaxed), false);
}
}

View File

@ -35,3 +35,4 @@ ethcore-logger = { path = "../logger" }
ethcrypto = { path = "../ethcrypto" }
ethkey = { path = "../ethkey" }
native-contracts = { path = "../ethcore/native_contracts" }
lazy_static = "0.2"

View File

@ -14,12 +14,13 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::sync::Arc;
use std::sync::{Arc, Weak};
use futures::{future, Future};
use parking_lot::Mutex;
use ethkey::public_to_address;
use ethcore::client::{Client, BlockChainClient, BlockId};
use ethcore::client::{Client, BlockChainClient, BlockId, ChainNotify};
use native_contracts::SecretStoreAclStorage;
use util::{H256, Address, Bytes};
use types::all::{Error, ServerKeyId, Public};
const ACL_CHECKER_CONTRACT_REGISTRY_NAME: &'static str = "secretstore_acl_checker";
@ -32,40 +33,82 @@ pub trait AclStorage: Send + Sync {
/// On-chain ACL storage implementation.
pub struct OnChainAclStorage {
/// Cached on-chain contract.
contract: Mutex<CachedContract>,
}
/// Cached on-chain ACL storage contract.
struct CachedContract {
/// Blockchain client.
client: Arc<Client>,
/// On-chain contract.
contract: Mutex<Option<SecretStoreAclStorage>>,
client: Weak<Client>,
/// Contract address.
contract_addr: Option<Address>,
/// Contract at given address.
contract: Option<SecretStoreAclStorage>,
}
impl OnChainAclStorage {
pub fn new(client: Arc<Client>) -> Self {
OnChainAclStorage {
client: client,
contract: Mutex::new(None),
}
pub fn new(client: &Arc<Client>) -> Arc<Self> {
let acl_storage = Arc::new(OnChainAclStorage {
contract: Mutex::new(CachedContract::new(client)),
});
client.add_notify(acl_storage.clone());
acl_storage
}
}
impl AclStorage for OnChainAclStorage {
fn check(&self, public: &Public, document: &ServerKeyId) -> Result<bool, Error> {
let mut contract = self.contract.lock();
if !contract.is_some() {
*contract = self.client.registry_address(ACL_CHECKER_CONTRACT_REGISTRY_NAME.to_owned())
.and_then(|contract_addr| {
self.contract.lock().check(public, document)
}
}
impl ChainNotify for OnChainAclStorage {
fn new_blocks(&self, _imported: Vec<H256>, _invalid: Vec<H256>, enacted: Vec<H256>, retracted: Vec<H256>, _sealed: Vec<H256>, _proposed: Vec<Bytes>, _duration: u64) {
if !enacted.is_empty() || !retracted.is_empty() {
self.contract.lock().update()
}
}
}
impl CachedContract {
pub fn new(client: &Arc<Client>) -> Self {
CachedContract {
client: Arc::downgrade(client),
contract_addr: None,
contract: None,
}
}
pub fn update(&mut self) {
if let Some(client) = self.client.upgrade() {
let new_contract_addr = client.registry_address(ACL_CHECKER_CONTRACT_REGISTRY_NAME.to_owned());
if self.contract_addr.as_ref() != new_contract_addr.as_ref() {
self.contract = new_contract_addr.map(|contract_addr| {
trace!(target: "secretstore", "Configuring for ACL checker contract from {}", contract_addr);
Some(SecretStoreAclStorage::new(contract_addr))
})
SecretStoreAclStorage::new(contract_addr)
});
self.contract_addr = new_contract_addr;
}
}
if let Some(ref contract) = *contract {
let address = public_to_address(&public);
let do_call = |a, d| future::done(self.client.call_contract(BlockId::Latest, a, d));
contract.check_permissions(do_call, address, document.clone())
.map_err(|err| Error::Internal(err))
.wait()
} else {
Err(Error::Internal("ACL checker contract is not configured".to_owned()))
}
pub fn check(&mut self, public: &Public, document: &ServerKeyId) -> Result<bool, Error> {
match self.contract.as_ref() {
Some(contract) => {
let address = public_to_address(&public);
let do_call = |a, d| future::done(
self.client
.upgrade()
.ok_or("Calling contract without client".into())
.and_then(|c| c.call_contract(BlockId::Latest, a, d)));
contract.check_permissions(do_call, address, document.clone())
.map_err(|err| Error::Internal(err))
.wait()
},
None => Err(Error::Internal("ACL checker contract is not configured".to_owned())),
}
}
}

View File

@ -24,6 +24,7 @@ use ethcrypto;
use ethkey;
use super::acl_storage::AclStorage;
use super::key_storage::KeyStorage;
use super::key_server_set::KeyServerSet;
use key_server_cluster::{math, ClusterCore};
use traits::{ServerKeyGenerator, DocumentKeyServer, MessageSigner, KeyServer};
use types::all::{Error, Public, RequestSignature, ServerKeyId, EncryptedDocumentKey, EncryptedDocumentKeyShadow,
@ -44,9 +45,9 @@ pub struct KeyServerCore {
impl KeyServerImpl {
/// Create new key server instance
pub fn new(config: &ClusterConfiguration, acl_storage: Arc<AclStorage>, key_storage: Arc<KeyStorage>) -> Result<Self, Error> {
pub fn new(config: &ClusterConfiguration, key_server_set: Arc<KeyServerSet>, acl_storage: Arc<AclStorage>, key_storage: Arc<KeyStorage>) -> Result<Self, Error> {
Ok(KeyServerImpl {
data: Arc::new(Mutex::new(KeyServerCore::new(config, acl_storage, key_storage)?)),
data: Arc::new(Mutex::new(KeyServerCore::new(config, key_server_set, acl_storage, key_storage)?)),
})
}
@ -143,14 +144,12 @@ impl MessageSigner for KeyServerImpl {
}
impl KeyServerCore {
pub fn new(config: &ClusterConfiguration, acl_storage: Arc<AclStorage>, key_storage: Arc<KeyStorage>) -> Result<Self, Error> {
pub fn new(config: &ClusterConfiguration, key_server_set: Arc<KeyServerSet>, acl_storage: Arc<AclStorage>, key_storage: Arc<KeyStorage>) -> Result<Self, Error> {
let config = NetClusterConfiguration {
threads: config.threads,
self_key_pair: ethkey::KeyPair::from_secret_slice(&config.self_private)?,
listen_address: (config.listener_address.address.clone(), config.listener_address.port),
nodes: config.nodes.iter()
.map(|(node_id, node_address)| (node_id.clone(), (node_address.address.clone(), node_address.port)))
.collect(),
key_server_set: key_server_set,
allow_connecting_to_higher_nodes: config.allow_connecting_to_higher_nodes,
acl_storage: acl_storage,
key_storage: key_storage,
@ -193,10 +192,13 @@ impl Drop for KeyServerCore {
pub mod tests {
use std::time;
use std::sync::Arc;
use std::net::SocketAddr;
use std::collections::BTreeMap;
use ethcrypto;
use ethkey::{self, Secret, Random, Generator};
use acl_storage::tests::DummyAclStorage;
use key_storage::tests::DummyKeyStorage;
use key_server_set::tests::MapKeyServerSet;
use key_server_cluster::math;
use util::H256;
use types::all::{Error, Public, ClusterConfiguration, NodeAddress, RequestSignature, ServerKeyId,
@ -254,8 +256,11 @@ pub mod tests {
})).collect(),
allow_connecting_to_higher_nodes: false,
}).collect();
let key_servers_set: BTreeMap<Public, SocketAddr> = configs[0].nodes.iter()
.map(|(k, a)| (k.clone(), format!("{}:{}", a.address, a.port).parse().unwrap()))
.collect();
let key_servers: Vec<_> = configs.into_iter().map(|cfg|
KeyServerImpl::new(&cfg, Arc::new(DummyAclStorage::default()), Arc::new(DummyKeyStorage::default())).unwrap()
KeyServerImpl::new(&cfg, Arc::new(MapKeyServerSet::new(key_servers_set.clone())), Arc::new(DummyAclStorage::default()), Arc::new(DummyKeyStorage::default())).unwrap()
).collect();
// wait until connections are established. It is fast => do not bother with events here

View File

@ -28,7 +28,7 @@ use tokio_core::reactor::{Handle, Remote, Interval};
use tokio_core::net::{TcpListener, TcpStream};
use ethkey::{Public, KeyPair, Signature, Random, Generator};
use util::H256;
use key_server_cluster::{Error, NodeId, SessionId, AclStorage, KeyStorage};
use key_server_cluster::{Error, NodeId, SessionId, AclStorage, KeyStorage, KeyServerSet};
use key_server_cluster::cluster_sessions::{ClusterSession, ClusterSessions, GenerationSessionWrapper, EncryptionSessionWrapper,
DecryptionSessionWrapper, SigningSessionWrapper};
use key_server_cluster::message::{self, Message, ClusterMessage, GenerationMessage, EncryptionMessage, DecryptionMessage,
@ -102,8 +102,8 @@ pub struct ClusterConfiguration {
pub self_key_pair: KeyPair,
/// Interface to listen to.
pub listen_address: (String, u16),
/// Cluster nodes.
pub nodes: BTreeMap<NodeId, (String, u16)>,
/// Cluster nodes set.
pub key_server_set: Arc<KeyServerSet>,
/// Reference to key storage
pub key_storage: Arc<KeyStorage>,
/// Reference to ACL storage
@ -158,9 +158,17 @@ pub struct ClusterConnections {
/// Self node id.
pub self_node_id: NodeId,
/// All known other key servers.
pub nodes: BTreeMap<NodeId, SocketAddr>,
pub key_server_set: Arc<KeyServerSet>,
/// Connections data.
pub data: RwLock<ClusterConnectionsData>,
}
/// Cluster connections data.
pub struct ClusterConnectionsData {
/// Active key servers set.
pub nodes: BTreeMap<Public, SocketAddr>,
/// Active connections to key servers.
pub connections: RwLock<BTreeMap<NodeId, Arc<Connection>>>,
pub connections: BTreeMap<NodeId, Arc<Connection>>,
}
/// Cluster view core.
@ -281,8 +289,7 @@ impl ClusterCore {
/// Accept connection future.
fn accept_connection_future(handle: &Handle, data: Arc<ClusterData>, stream: TcpStream, node_address: SocketAddr) -> BoxedEmptyFuture {
let disconnected_nodes = data.connections.disconnected_nodes().keys().cloned().collect();
net_accept_connection(node_address, stream, handle, data.self_key_pair.clone(), disconnected_nodes)
net_accept_connection(node_address, stream, handle, data.self_key_pair.clone())
.then(move |result| ClusterCore::process_connection_result(data, true, result))
.then(|_| finished(()))
.boxed()
@ -354,6 +361,7 @@ impl ClusterCore {
/// Try to connect to every disconnected node.
fn connect_disconnected_nodes(data: Arc<ClusterData>) {
data.connections.update_nodes_set();
for (node_id, node_address) in data.connections.disconnected_nodes() {
if data.config.allow_connecting_to_higher_nodes || data.self_key_pair.public() < &node_id {
ClusterCore::connect(data.clone(), node_address);
@ -372,14 +380,16 @@ impl ClusterCore {
finished(Ok(())).boxed()
}
},
Ok(DeadlineStatus::Meet(Err(_))) => {
Ok(DeadlineStatus::Meet(Err(err))) => {
warn!(target: "secretstore_net", "{}: protocol error {} when establishind connection", data.self_key_pair.public(), err);
finished(Ok(())).boxed()
},
Ok(DeadlineStatus::Timeout) => {
warn!(target: "secretstore_net", "{}: timeout when establishind connection", data.self_key_pair.public());
finished(Ok(())).boxed()
},
Err(_) => {
// network error
Err(err) => {
warn!(target: "secretstore_net", "{}: network error {} when establishind connection", data.self_key_pair.public(), err);
finished(Ok(())).boxed()
},
}
@ -665,33 +675,38 @@ impl ClusterCore {
impl ClusterConnections {
pub fn new(config: &ClusterConfiguration) -> Result<Self, Error> {
let mut connections = ClusterConnections {
let mut nodes = config.key_server_set.get();
nodes.remove(config.self_key_pair.public());
Ok(ClusterConnections {
self_node_id: config.self_key_pair.public().clone(),
nodes: BTreeMap::new(),
connections: RwLock::new(BTreeMap::new()),
};
for (node_id, &(ref node_addr, node_port)) in config.nodes.iter().filter(|&(node_id, _)| node_id != config.self_key_pair.public()) {
let socket_address = make_socket_address(&node_addr, node_port)?;
connections.nodes.insert(node_id.clone(), socket_address);
}
Ok(connections)
key_server_set: config.key_server_set.clone(),
data: RwLock::new(ClusterConnectionsData {
nodes: nodes,
connections: BTreeMap::new(),
}),
})
}
pub fn cluster_state(&self) -> ClusterState {
ClusterState {
connected: self.connections.read().keys().cloned().collect(),
connected: self.data.read().connections.keys().cloned().collect(),
}
}
pub fn get(&self, node: &NodeId) -> Option<Arc<Connection>> {
self.connections.read().get(node).cloned()
self.data.read().connections.get(node).cloned()
}
pub fn insert(&self, connection: Arc<Connection>) -> bool {
let mut connections = self.connections.write();
if connections.contains_key(connection.node_id()) {
let mut data = self.data.write();
if !data.nodes.contains_key(connection.node_id()) {
// incoming connections are checked here
trace!(target: "secretstore_net", "{}: ignoring unknown connection from {} at {}", self.self_node_id, connection.node_id(), connection.node_address());
debug_assert!(connection.is_inbound());
return false;
}
if data.connections.contains_key(connection.node_id()) {
// we have already connected to the same node
// the agreement is that node with lower id must establish connection to node with higher id
if (&self.self_node_id < connection.node_id() && connection.is_inbound())
@ -700,14 +715,15 @@ impl ClusterConnections {
}
}
trace!(target: "secretstore_net", "{}: inserting connection to {} at {}", self.self_node_id, connection.node_id(), connection.node_address());
connections.insert(connection.node_id().clone(), connection);
trace!(target: "secretstore_net", "{}: inserting connection to {} at {}. Connected to {} of {} nodes",
self.self_node_id, connection.node_id(), connection.node_address(), data.connections.len() + 1, data.nodes.len());
data.connections.insert(connection.node_id().clone(), connection);
true
}
pub fn remove(&self, node: &NodeId, is_inbound: bool) {
let mut connections = self.connections.write();
if let Entry::Occupied(entry) = connections.entry(node.clone()) {
let mut data = self.data.write();
if let Entry::Occupied(entry) = data.connections.entry(node.clone()) {
if entry.get().is_inbound() != is_inbound {
return;
}
@ -718,20 +734,64 @@ impl ClusterConnections {
}
pub fn connected_nodes(&self) -> BTreeSet<NodeId> {
self.connections.read().keys().cloned().collect()
self.data.read().connections.keys().cloned().collect()
}
pub fn active_connections(&self)-> Vec<Arc<Connection>> {
self.connections.read().values().cloned().collect()
self.data.read().connections.values().cloned().collect()
}
pub fn disconnected_nodes(&self) -> BTreeMap<NodeId, SocketAddr> {
let connections = self.connections.read();
self.nodes.iter()
.filter(|&(node_id, _)| !connections.contains_key(node_id))
let data = self.data.read();
data.nodes.iter()
.filter(|&(node_id, _)| !data.connections.contains_key(node_id))
.map(|(node_id, node_address)| (node_id.clone(), node_address.clone()))
.collect()
}
pub fn update_nodes_set(&self) {
let mut data = self.data.write();
let mut new_nodes = self.key_server_set.get();
// we do not need to connect to self
// + we do not need to try to connect to any other node if we are not the part of a cluster
if new_nodes.remove(&self.self_node_id).is_none() {
new_nodes.clear();
}
let mut num_added_nodes = 0;
let mut num_removed_nodes = 0;
let mut num_changed_nodes = 0;
for obsolete_node in data.nodes.keys().cloned().collect::<Vec<_>>() {
if !new_nodes.contains_key(&obsolete_node) {
if let Entry::Occupied(entry) = data.connections.entry(obsolete_node) {
trace!(target: "secretstore_net", "{}: removing connection to {} at {}", self.self_node_id, entry.get().node_id(), entry.get().node_address());
entry.remove();
}
data.nodes.remove(&obsolete_node);
num_removed_nodes += 1;
}
}
for (new_node_public, new_node_addr) in new_nodes {
match data.nodes.insert(new_node_public, new_node_addr) {
None => num_added_nodes += 1,
Some(old_node_addr) => if new_node_addr != old_node_addr {
if let Entry::Occupied(entry) = data.connections.entry(new_node_public) {
trace!(target: "secretstore_net", "{}: removing connection to {} at {}", self.self_node_id, entry.get().node_id(), entry.get().node_address());
entry.remove();
}
num_changed_nodes += 1;
},
}
}
if num_added_nodes != 0 || num_removed_nodes != 0 || num_changed_nodes != 0 {
trace!(target: "secretstore_net", "{}: updated nodes set: removed {}, added {}, changed {}. Connected to {} of {} nodes",
self.self_node_id, num_removed_nodes, num_added_nodes, num_changed_nodes, data.connections.len(), data.nodes.len());
}
}
}
impl ClusterData {
@ -929,7 +989,7 @@ pub mod tests {
use parking_lot::Mutex;
use tokio_core::reactor::Core;
use ethkey::{Random, Generator, Public};
use key_server_cluster::{NodeId, SessionId, Error, DummyAclStorage, DummyKeyStorage};
use key_server_cluster::{NodeId, SessionId, Error, DummyAclStorage, DummyKeyStorage, MapKeyServerSet};
use key_server_cluster::message::Message;
use key_server_cluster::cluster::{Cluster, ClusterCore, ClusterConfiguration};
use key_server_cluster::generation_session::{Session as GenerationSession, SessionState as GenerationSessionState};
@ -999,7 +1059,7 @@ pub mod tests {
}
pub fn all_connections_established(cluster: &Arc<ClusterCore>) -> bool {
cluster.config().nodes.keys()
cluster.config().key_server_set.get().keys()
.filter(|p| *p != cluster.config().self_key_pair.public())
.all(|p| cluster.connection(p).is_some())
}
@ -1010,9 +1070,9 @@ pub mod tests {
threads: 1,
self_key_pair: key_pairs[i].clone(),
listen_address: ("127.0.0.1".to_owned(), ports_begin + i as u16),
nodes: key_pairs.iter().enumerate()
.map(|(j, kp)| (kp.public().clone(), ("127.0.0.1".into(), ports_begin + j as u16)))
.collect(),
key_server_set: Arc::new(MapKeyServerSet::new(key_pairs.iter().enumerate()
.map(|(j, kp)| (kp.public().clone(), format!("127.0.0.1:{}", ports_begin + j as u16).parse().unwrap()))
.collect())),
allow_connecting_to_higher_nodes: false,
key_storage: Arc::new(DummyKeyStorage::default()),
acl_storage: Arc::new(DummyAclStorage::default()),

View File

@ -135,7 +135,7 @@ impl ClusterSessions {
pub fn new(config: &ClusterConfiguration) -> Self {
ClusterSessions {
self_node_id: config.self_key_pair.public().clone(),
nodes: config.nodes.keys().cloned().collect(),
nodes: config.key_server_set.get().keys().cloned().collect(),
acl_storage: config.acl_storage.clone(),
key_storage: config.key_storage.clone(),
generation_sessions: ClusterSessionsContainer::new(),

View File

@ -45,7 +45,7 @@ pub fn handshake_with_plain_confirmation<A>(a: A, self_confirmation_plain: Resul
state: state,
self_key_pair: self_key_pair,
self_confirmation_plain: self_confirmation_plain.unwrap_or(Default::default()),
trusted_nodes: trusted_nodes,
trusted_nodes: Some(trusted_nodes),
other_node_id: None,
other_confirmation_plain: None,
shared_key: None,
@ -53,7 +53,7 @@ pub fn handshake_with_plain_confirmation<A>(a: A, self_confirmation_plain: Resul
}
/// Wait for handshake procedure to be started by another node from the cluster.
pub fn accept_handshake<A>(a: A, self_key_pair: KeyPair, trusted_nodes: BTreeSet<NodeId>) -> Handshake<A> where A: AsyncWrite + AsyncRead {
pub fn accept_handshake<A>(a: A, self_key_pair: KeyPair) -> Handshake<A> where A: AsyncWrite + AsyncRead {
let self_confirmation_plain = Random.generate().map(|kp| *kp.secret().clone()).map_err(Into::into);
let (error, state) = match self_confirmation_plain.clone() {
Ok(_) => (None, HandshakeState::ReceivePublicKey(read_message(a))),
@ -66,7 +66,7 @@ pub fn accept_handshake<A>(a: A, self_key_pair: KeyPair, trusted_nodes: BTreeSet
state: state,
self_key_pair: self_key_pair,
self_confirmation_plain: self_confirmation_plain.unwrap_or(Default::default()),
trusted_nodes: trusted_nodes,
trusted_nodes: None,
other_node_id: None,
other_confirmation_plain: None,
shared_key: None,
@ -89,7 +89,7 @@ pub struct Handshake<A> {
state: HandshakeState<A>,
self_key_pair: KeyPair,
self_confirmation_plain: H256,
trusted_nodes: BTreeSet<NodeId>,
trusted_nodes: Option<BTreeSet<NodeId>>,
other_node_id: Option<NodeId>,
other_confirmation_plain: Option<H256>,
shared_key: Option<KeyPair>,
@ -172,7 +172,7 @@ impl<A> Future for Handshake<A> where A: AsyncRead + AsyncWrite {
Err(err) => return Ok((stream, Err(err.into())).into()),
};
if !self.trusted_nodes.contains(&*message.node_id) {
if !self.trusted_nodes.as_ref().map(|tn| tn.contains(&*message.node_id)).unwrap_or(true) {
return Ok((stream, Err(Error::InvalidNodeId)).into());
}
@ -300,7 +300,7 @@ mod tests {
let trusted_nodes: BTreeSet<_> = vec![io.peer_public().clone()].into_iter().collect();
let shared_key = compute_shared_key(self_key_pair.secret(), trusted_nodes.iter().nth(0).unwrap()).unwrap();
let mut handshake = accept_handshake(io, self_key_pair, trusted_nodes);
let mut handshake = accept_handshake(io, self_key_pair);
handshake.set_self_confirmation_plain(self_confirmation_plain);
let handshake_result = handshake.wait().unwrap();

View File

@ -299,22 +299,22 @@ impl<Executor, Transport> JobSession<Executor, Transport> where Executor: JobExe
return Err(Error::ConsensusUnreachable);
}
let active_data = self.data.active_data.as_mut()
.expect("we have checked that we are on master node; on master nodes active_data is filled during initialization; qed");
if active_data.rejects.contains(node) {
return Ok(());
}
if active_data.requests.remove(node) || active_data.responses.remove(node).is_some() {
active_data.rejects.insert(node.clone());
if self.data.state == JobSessionState::Finished && active_data.responses.len() < self.meta.threshold + 1 {
self.data.state = JobSessionState::Active;
}
if active_data.requests.len() + active_data.responses.len() >= self.meta.threshold + 1 {
if let Some(active_data) = self.data.active_data.as_mut() {
if active_data.rejects.contains(node) {
return Ok(());
}
if active_data.requests.remove(node) || active_data.responses.remove(node).is_some() {
active_data.rejects.insert(node.clone());
if self.data.state == JobSessionState::Finished && active_data.responses.len() < self.meta.threshold + 1 {
self.data.state = JobSessionState::Active;
}
if active_data.requests.len() + active_data.responses.len() >= self.meta.threshold + 1 {
return Ok(());
}
self.data.state = JobSessionState::Failed;
return Err(Error::ConsensusUnreachable);
self.data.state = JobSessionState::Failed;
return Err(Error::ConsensusUnreachable);
}
}
Ok(())

View File

@ -23,6 +23,7 @@ use super::types::all::ServerKeyId;
pub use super::types::all::{NodeId, EncryptedDocumentKeyShadow};
pub use super::acl_storage::AclStorage;
pub use super::key_storage::{KeyStorage, DocumentKeyShare};
pub use super::key_server_set::KeyServerSet;
pub use super::serialization::{SerializableSignature, SerializableH256, SerializableSecret, SerializablePublic, SerializableMessageHash};
pub use self::cluster::{ClusterCore, ClusterConfiguration, ClusterClient};
pub use self::generation_session::Session as GenerationSession;
@ -33,6 +34,8 @@ pub use self::decryption_session::Session as DecryptionSession;
pub use super::key_storage::tests::DummyKeyStorage;
#[cfg(test)]
pub use super::acl_storage::tests::DummyAclStorage;
#[cfg(test)]
pub use super::key_server_set::tests::MapKeyServerSet;
pub type SessionId = ServerKeyId;

View File

@ -17,19 +17,18 @@
use std::io;
use std::net::SocketAddr;
use std::time::Duration;
use std::collections::BTreeSet;
use futures::{Future, Poll};
use tokio_core::reactor::Handle;
use tokio_core::net::TcpStream;
use ethkey::KeyPair;
use key_server_cluster::{Error, NodeId};
use key_server_cluster::Error;
use key_server_cluster::io::{accept_handshake, Handshake, Deadline, deadline};
use key_server_cluster::net::Connection;
/// Create future for accepting incoming connection.
pub fn accept_connection(address: SocketAddr, stream: TcpStream, handle: &Handle, self_key_pair: KeyPair, trusted_nodes: BTreeSet<NodeId>) -> Deadline<AcceptConnection> {
pub fn accept_connection(address: SocketAddr, stream: TcpStream, handle: &Handle, self_key_pair: KeyPair) -> Deadline<AcceptConnection> {
let accept = AcceptConnection {
handshake: accept_handshake(stream, self_key_pair, trusted_nodes),
handshake: accept_handshake(stream, self_key_pair),
address: address,
};

View File

@ -0,0 +1,204 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::sync::{Arc, Weak};
use std::net::SocketAddr;
use std::collections::BTreeMap;
use futures::{future, Future};
use parking_lot::Mutex;
use ethcore::filter::Filter;
use ethcore::client::{Client, BlockChainClient, BlockId, ChainNotify};
use native_contracts::KeyServerSet as KeyServerSetContract;
use util::{H256, Address, Bytes, Hashable};
use types::all::{Error, Public, NodeAddress};
const KEY_SERVER_SET_CONTRACT_REGISTRY_NAME: &'static str = "secretstore_server_set";
/// Key server has been added to the set.
const ADDED_EVENT_NAME: &'static [u8] = &*b"KeyServerAdded(address)";
/// Key server has been removed from the set.
const REMOVED_EVENT_NAME: &'static [u8] = &*b"KeyServerRemoved(address)";
lazy_static! {
static ref ADDED_EVENT_NAME_HASH: H256 = ADDED_EVENT_NAME.sha3();
static ref REMOVED_EVENT_NAME_HASH: H256 = REMOVED_EVENT_NAME.sha3();
}
/// Key Server set
pub trait KeyServerSet: Send + Sync {
/// Get set of configured key servers
fn get(&self) -> BTreeMap<Public, SocketAddr>;
}
/// On-chain Key Server set implementation.
pub struct OnChainKeyServerSet {
/// Cached on-chain contract.
contract: Mutex<CachedContract>,
}
/// Cached on-chain Key Server set contract.
struct CachedContract {
/// Blockchain client.
client: Weak<Client>,
/// Contract address.
contract_addr: Option<Address>,
/// Active set of key servers.
key_servers: BTreeMap<Public, SocketAddr>,
}
impl OnChainKeyServerSet {
pub fn new(client: &Arc<Client>, key_servers: BTreeMap<Public, NodeAddress>) -> Result<Arc<Self>, Error> {
let mut cached_contract = CachedContract::new(client, key_servers)?;
let key_server_contract_address = client.registry_address(KEY_SERVER_SET_CONTRACT_REGISTRY_NAME.to_owned());
// only initialize from contract if it is installed. otherwise - use default nodes
// once the contract is installed, all default nodes are lost (if not in the contract' set)
if key_server_contract_address.is_some() {
cached_contract.read_from_registry(&*client, key_server_contract_address);
}
let key_server_set = Arc::new(OnChainKeyServerSet {
contract: Mutex::new(cached_contract),
});
client.add_notify(key_server_set.clone());
Ok(key_server_set)
}
}
impl KeyServerSet for OnChainKeyServerSet {
fn get(&self) -> BTreeMap<Public, SocketAddr> {
self.contract.lock().get()
}
}
impl ChainNotify for OnChainKeyServerSet {
fn new_blocks(&self, _imported: Vec<H256>, _invalid: Vec<H256>, enacted: Vec<H256>, retracted: Vec<H256>, _sealed: Vec<H256>, _proposed: Vec<Bytes>, _duration: u64) {
if !enacted.is_empty() || !retracted.is_empty() {
self.contract.lock().update(enacted, retracted)
}
}
}
impl CachedContract {
pub fn new(client: &Arc<Client>, key_servers: BTreeMap<Public, NodeAddress>) -> Result<Self, Error> {
Ok(CachedContract {
client: Arc::downgrade(client),
contract_addr: None,
key_servers: key_servers.into_iter()
.map(|(p, addr)| {
let addr = format!("{}:{}", addr.address, addr.port).parse()
.map_err(|err| Error::Internal(format!("error parsing node address: {}", err)))?;
Ok((p, addr))
})
.collect::<Result<BTreeMap<_, _>, Error>>()?,
})
}
pub fn update(&mut self, enacted: Vec<H256>, retracted: Vec<H256>) {
if let Some(client) = self.client.upgrade() {
let new_contract_addr = client.registry_address(KEY_SERVER_SET_CONTRACT_REGISTRY_NAME.to_owned());
// new contract installed => read nodes set from the contract
if self.contract_addr.as_ref() != new_contract_addr.as_ref() {
self.read_from_registry(&*client, new_contract_addr);
return;
}
// check for contract events
let is_set_changed = self.contract_addr.is_some() && enacted.iter()
.chain(retracted.iter())
.any(|block_hash| !client.logs(Filter {
from_block: BlockId::Hash(block_hash.clone()),
to_block: BlockId::Hash(block_hash.clone()),
address: self.contract_addr.clone().map(|a| vec![a]),
topics: vec![
Some(vec![*ADDED_EVENT_NAME_HASH, *REMOVED_EVENT_NAME_HASH]),
None,
None,
None,
],
limit: Some(1),
}).is_empty());
// to simplify processing - just re-read the whole nodes set from the contract
if is_set_changed {
self.read_from_registry(&*client, new_contract_addr);
}
}
}
pub fn get(&self) -> BTreeMap<Public, SocketAddr> {
self.key_servers.clone()
}
fn read_from_registry(&mut self, client: &Client, new_contract_address: Option<Address>) {
self.key_servers = new_contract_address.map(|contract_addr| {
trace!(target: "secretstore", "Configuring for key server set contract from {}", contract_addr);
KeyServerSetContract::new(contract_addr)
})
.map(|contract| {
let mut key_servers = BTreeMap::new();
let do_call = |a, d| future::done(client.call_contract(BlockId::Latest, a, d));
let key_servers_list = contract.get_key_servers(do_call).wait()
.map_err(|err| { trace!(target: "secretstore", "Error {} reading list of key servers from contract", err); err })
.unwrap_or_default();
for key_server in key_servers_list {
let key_server_public = contract.get_key_server_public(
|a, d| future::done(client.call_contract(BlockId::Latest, a, d)), key_server).wait()
.and_then(|p| if p.len() == 64 { Ok(Public::from_slice(&p)) } else { Err(format!("Invalid public length {}", p.len())) });
let key_server_ip = contract.get_key_server_address(
|a, d| future::done(client.call_contract(BlockId::Latest, a, d)), key_server).wait()
.and_then(|a| a.parse().map_err(|e| format!("Invalid ip address: {}", e)));
// only add successfully parsed nodes
match (key_server_public, key_server_ip) {
(Ok(key_server_public), Ok(key_server_ip)) => { key_servers.insert(key_server_public, key_server_ip); },
(Err(public_err), _) => warn!(target: "secretstore_net", "received invalid public from key server set contract: {}", public_err),
(_, Err(ip_err)) => warn!(target: "secretstore_net", "received invalid IP from key server set contract: {}", ip_err),
}
}
key_servers
})
.unwrap_or_default();
self.contract_addr = new_contract_address;
}
}
#[cfg(test)]
pub mod tests {
use std::collections::BTreeMap;
use std::net::SocketAddr;
use ethkey::Public;
use super::KeyServerSet;
#[derive(Default)]
pub struct MapKeyServerSet {
nodes: BTreeMap<Public, SocketAddr>,
}
impl MapKeyServerSet {
pub fn new(nodes: BTreeMap<Public, SocketAddr>) -> Self {
MapKeyServerSet {
nodes: nodes,
}
}
}
impl KeyServerSet for MapKeyServerSet {
fn get(&self) -> BTreeMap<Public, SocketAddr> {
self.nodes.clone()
}
}
}

View File

@ -21,6 +21,8 @@ extern crate log;
extern crate futures;
extern crate futures_cpupool;
extern crate hyper;
#[macro_use]
extern crate lazy_static;
extern crate parking_lot;
extern crate rustc_hex;
extern crate serde;
@ -56,6 +58,7 @@ mod http_listener;
mod key_server;
mod key_storage;
mod serialization;
mod key_server_set;
use std::sync::Arc;
use ethcore::client::Client;
@ -68,9 +71,10 @@ pub use traits::{KeyServer};
pub fn start(client: Arc<Client>, config: ServiceConfiguration) -> Result<Box<KeyServer>, Error> {
use std::sync::Arc;
let acl_storage = Arc::new(acl_storage::OnChainAclStorage::new(client));
let acl_storage = acl_storage::OnChainAclStorage::new(&client);
let key_server_set = key_server_set::OnChainKeyServerSet::new(&client, config.cluster_config.nodes.clone())?;
let key_storage = Arc::new(key_storage::PersistentKeyStorage::new(&config)?);
let key_server = key_server::KeyServerImpl::new(&config.cluster_config, acl_storage, key_storage)?;
let key_server = key_server::KeyServerImpl::new(&config.cluster_config, key_server_set, acl_storage, key_storage)?;
let listener = http_listener::KeyServerHttpListener::start(&config.listener_address, key_server)?;
Ok(Box::new(listener))
}