[beta] Backports (#6333)

* overflow check in addition

* add test

* Unexpose methods on UI RPC. (#6295)

* Add more descriptive error when signing/decrypting using hw wallet.

* format instant change proofs correctly

* propagate stratum submit share error upstream, fixes #6258 (#6260)

* updated jsonrpc (#6264)

* Using multiple NTP servers (#6173)

* Small improvements to time estimation.

* Allow multiple NTP servers to be used.

* Removing boxing.

* Be nice.

* Be nicer.

* Update list of servers and add reference.

* Fix dapps CSP when UI is exposed externally (#6178)

* Allow embeding on any page when ui-hosts=all and fix dev_ui

* Fix tests.

* Fix cache path when using --base-path (#6212)

* Time should not contribue to overall status. (#6276)

* v1.7.1
This commit is contained in:
Arkadiy Paronyan
2017-08-19 22:10:19 +02:00
committed by GitHub
parent 75eb542275
commit 4992064663
23 changed files with 343 additions and 148 deletions

View File

@@ -21,7 +21,7 @@ use hyper::method::Method;
use hyper::status::StatusCode;
use api::{response, types};
use api::time::TimeChecker;
use api::time::{TimeChecker, MAX_DRIFT};
use apps::fetcher::Fetcher;
use handlers::{self, extract_url};
use endpoint::{Endpoint, Handler, EndpointPath};
@@ -122,7 +122,6 @@ impl RestApiRouter {
// Check time
let time = {
const MAX_DRIFT: i64 = 500;
let (status, message, details) = match time {
Ok(Ok(diff)) if diff < MAX_DRIFT && diff > -MAX_DRIFT => {
(HealthStatus::Ok, "".into(), diff)

View File

@@ -33,10 +33,12 @@
use std::io;
use std::{fmt, mem, time};
use std::sync::atomic::{self, AtomicUsize};
use std::collections::VecDeque;
use futures::{self, Future, BoxFuture};
use futures_cpupool::CpuPool;
use futures::future::{self, IntoFuture};
use futures_cpupool::{CpuPool, CpuFuture};
use ntp;
use time::{Duration, Timespec};
use util::{Arc, RwLock};
@@ -44,6 +46,8 @@ use util::{Arc, RwLock};
/// Time checker error.
#[derive(Debug, Clone, PartialEq)]
pub enum Error {
/// No servers are currently available for a query.
NoServersAvailable,
/// There was an error when trying to reach the NTP server.
Ntp(String),
/// IO error when reading NTP response.
@@ -55,6 +59,7 @@ impl fmt::Display for Error {
use self::Error::*;
match *self {
NoServersAvailable => write!(fmt, "No NTP servers available"),
Ntp(ref err) => write!(fmt, "NTP error: {}", err),
Io(ref err) => write!(fmt, "Connection Error: {}", err),
}
@@ -71,58 +76,123 @@ impl From<ntp::errors::Error> for Error {
/// NTP time drift checker.
pub trait Ntp {
/// Returned Future.
type Future: IntoFuture<Item=Duration, Error=Error>;
/// Returns the current time drift.
fn drift(&self) -> BoxFuture<Duration, Error>;
fn drift(&self) -> Self::Future;
}
const SERVER_MAX_POLL_INTERVAL_SECS: u64 = 60;
#[derive(Debug)]
struct Server {
pub address: String,
next_call: RwLock<time::Instant>,
failures: AtomicUsize,
}
impl Server {
pub fn is_available(&self) -> bool {
*self.next_call.read() < time::Instant::now()
}
pub fn report_success(&self) {
self.failures.store(0, atomic::Ordering::SeqCst);
self.update_next_call(1)
}
pub fn report_failure(&self) {
let errors = self.failures.fetch_add(1, atomic::Ordering::SeqCst);
self.update_next_call(1 << errors)
}
fn update_next_call(&self, delay: usize) {
*self.next_call.write() = time::Instant::now() + time::Duration::from_secs(delay as u64 * SERVER_MAX_POLL_INTERVAL_SECS);
}
}
impl<T: AsRef<str>> From<T> for Server {
fn from(t: T) -> Self {
Server {
address: t.as_ref().to_owned(),
next_call: RwLock::new(time::Instant::now()),
failures: Default::default(),
}
}
}
/// NTP client using the SNTP algorithm for calculating drift.
#[derive(Clone)]
pub struct SimpleNtp {
address: Arc<String>,
addresses: Vec<Arc<Server>>,
pool: CpuPool,
}
impl fmt::Debug for SimpleNtp {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Ntp {{ address: {} }}", self.address)
f
.debug_struct("SimpleNtp")
.field("addresses", &self.addresses)
.finish()
}
}
impl SimpleNtp {
fn new(address: &str, pool: CpuPool) -> SimpleNtp {
fn new<T: AsRef<str>>(addresses: &[T], pool: CpuPool) -> SimpleNtp {
SimpleNtp {
address: Arc::new(address.to_owned()),
addresses: addresses.iter().map(Server::from).map(Arc::new).collect(),
pool: pool,
}
}
}
impl Ntp for SimpleNtp {
fn drift(&self) -> BoxFuture<Duration, Error> {
let address = self.address.clone();
if &*address == "none" {
return futures::future::err(Error::Ntp("NTP server is not provided.".into())).boxed();
}
type Future = future::Either<
CpuFuture<Duration, Error>,
future::FutureResult<Duration, Error>,
>;
self.pool.spawn_fn(move || {
let packet = ntp::request(&*address)?;
let dest_time = ::time::now_utc().to_timespec();
let orig_time = Timespec::from(packet.orig_time);
let recv_time = Timespec::from(packet.recv_time);
let transmit_time = Timespec::from(packet.transmit_time);
fn drift(&self) -> Self::Future {
use self::future::Either::{A, B};
let drift = ((recv_time - orig_time) + (transmit_time - dest_time)) / 2;
let server = self.addresses.iter().find(|server| server.is_available());
server.map(|server| {
let server = server.clone();
A(self.pool.spawn_fn(move || {
debug!(target: "dapps", "Fetching time from {}.", server.address);
Ok(drift)
}).boxed()
match ntp::request(&server.address) {
Ok(packet) => {
let dest_time = ::time::now_utc().to_timespec();
let orig_time = Timespec::from(packet.orig_time);
let recv_time = Timespec::from(packet.recv_time);
let transmit_time = Timespec::from(packet.transmit_time);
let drift = ((recv_time - orig_time) + (transmit_time - dest_time)) / 2;
server.report_success();
Ok(drift)
},
Err(err) => {
server.report_failure();
Err(err.into())
},
}
}))
}).unwrap_or_else(|| B(future::err(Error::NoServersAvailable)))
}
}
// NOTE In a positive scenario first results will be seen after:
// MAX_RESULTS * UPDATE_TIMEOUT_OK_SECS seconds.
const MAX_RESULTS: usize = 7;
const UPDATE_TIMEOUT_OK_SECS: u64 = 30;
const UPDATE_TIMEOUT_ERR_SECS: u64 = 2;
// MAX_RESULTS * UPDATE_TIMEOUT_INCOMPLETE_SECS seconds.
const MAX_RESULTS: usize = 4;
const UPDATE_TIMEOUT_OK_SECS: u64 = 6 * 60 * 60;
const UPDATE_TIMEOUT_WARN_SECS: u64 = 15 * 60;
const UPDATE_TIMEOUT_ERR_SECS: u64 = 60;
const UPDATE_TIMEOUT_INCOMPLETE_SECS: u64 = 10;
/// Maximal valid time drift.
pub const MAX_DRIFT: i64 = 500;
#[derive(Debug, Clone)]
/// A time checker.
@@ -133,13 +203,13 @@ pub struct TimeChecker<N: Ntp = SimpleNtp> {
impl TimeChecker<SimpleNtp> {
/// Creates new time checker given the NTP server address.
pub fn new(ntp_address: String, pool: CpuPool) -> Self {
pub fn new<T: AsRef<str>>(ntp_addresses: &[T], pool: CpuPool) -> Self {
let last_result = Arc::new(RwLock::new(
// Assume everything is ok at the very beginning.
(time::Instant::now(), vec![Ok(0)].into())
));
let ntp = SimpleNtp::new(&ntp_address, pool);
let ntp = SimpleNtp::new(ntp_addresses, pool);
TimeChecker {
ntp,
@@ -148,22 +218,34 @@ impl TimeChecker<SimpleNtp> {
}
}
impl<N: Ntp> TimeChecker<N> {
impl<N: Ntp> TimeChecker<N> where <N::Future as IntoFuture>::Future: Send + 'static {
/// Updates the time
pub fn update(&self) -> BoxFuture<i64, Error> {
trace!(target: "dapps", "Updating time from NTP.");
let last_result = self.last_result.clone();
self.ntp.drift().then(move |res| {
self.ntp.drift().into_future().then(move |res| {
let res = res.map(|d| d.num_milliseconds());
if let Err(Error::NoServersAvailable) = res {
debug!(target: "dapps", "No NTP servers available. Selecting an older result.");
return select_result(last_result.read().1.iter());
}
// Update the results.
let mut results = mem::replace(&mut last_result.write().1, VecDeque::new());
let has_all_results = results.len() >= MAX_RESULTS;
let valid_till = time::Instant::now() + time::Duration::from_secs(
if res.is_ok() && results.len() == MAX_RESULTS {
UPDATE_TIMEOUT_OK_SECS
} else {
UPDATE_TIMEOUT_ERR_SECS
match res {
Ok(time) if has_all_results && time < MAX_DRIFT => UPDATE_TIMEOUT_OK_SECS,
Ok(_) if has_all_results => UPDATE_TIMEOUT_WARN_SECS,
Err(_) if has_all_results => UPDATE_TIMEOUT_ERR_SECS,
_ => UPDATE_TIMEOUT_INCOMPLETE_SECS,
}
);
trace!(target: "dapps", "New time drift received: {:?}", res);
// Push the result.
results.push_back(res.map(|d| d.num_milliseconds()));
results.push_back(res);
while results.len() > MAX_RESULTS {
results.pop_front();
}
@@ -208,7 +290,7 @@ mod tests {
use std::cell::{Cell, RefCell};
use std::time::Instant;
use time::Duration;
use futures::{self, BoxFuture, Future};
use futures::{future, Future};
use super::{Ntp, TimeChecker, Error};
use util::RwLock;
@@ -223,9 +305,11 @@ mod tests {
}
impl Ntp for FakeNtp {
fn drift(&self) -> BoxFuture<Duration, Error> {
type Future = future::FutureResult<Duration, Error>;
fn drift(&self) -> Self::Future {
self.1.set(self.1.get() + 1);
futures::future::ok(self.0.borrow_mut().pop().expect("Unexpected call to drift().")).boxed()
future::ok(self.0.borrow_mut().pop().expect("Unexpected call to drift()."))
}
}

View File

@@ -67,10 +67,20 @@ pub fn add_security_headers(headers: &mut header::Headers, embeddable_on: Embedd
// Allow fonts from data: and HTTPS.
b"font-src 'self' data: https:;".to_vec(),
// Allow inline scripts and scripts eval (webpack/jsconsole)
b"script-src 'self' 'unsafe-inline' 'unsafe-eval';".to_vec(),
// Same restrictions as script-src (fallback) with additional
{
let script_src = embeddable_on.as_ref()
.map(|e| e.extra_script_src.iter()
.map(|&(ref host, port)| address(host, port))
.join(" ")
).unwrap_or_default();
format!(
"script-src 'self' 'unsafe-inline' 'unsafe-eval' {};",
script_src
).into_bytes()
},
// Same restrictions as script-src with additional
// blob: that is required for camera access (worker)
b"worker-src 'self' 'unsafe-inline' 'unsafe-eval' blob: ;".to_vec(),
b"worker-src 'self' 'unsafe-inline' 'unsafe-eval' https: blob:;".to_vec(),
// Restrict everything else to the same origin.
b"default-src 'self';".to_vec(),
// Run in sandbox mode (although it's not fully safe since we allow same-origin and script)
@@ -90,7 +100,7 @@ pub fn add_security_headers(headers: &mut header::Headers, embeddable_on: Embedd
.into_iter()
.chain(embed.extra_embed_on
.iter()
.map(|&(ref host, port)| format!("{}:{}", host, port))
.map(|&(ref host, port)| address(host, port))
);
let ancestors = if embed.host == "127.0.0.1" {

View File

@@ -130,7 +130,7 @@ impl Middleware {
/// Creates new middleware for UI server.
pub fn ui<F: Fetch>(
ntp_server: &str,
ntp_servers: &[String],
pool: CpuPool,
remote: Remote,
dapps_domain: &str,
@@ -146,7 +146,7 @@ impl Middleware {
).embeddable_on(None).allow_dapps(false));
let special = {
let mut special = special_endpoints(
ntp_server,
ntp_servers,
pool,
content_fetcher.clone(),
remote.clone(),
@@ -171,11 +171,12 @@ impl Middleware {
/// Creates new Dapps server middleware.
pub fn dapps<F: Fetch>(
ntp_server: &str,
ntp_servers: &[String],
pool: CpuPool,
remote: Remote,
ui_address: Option<(String, u16)>,
extra_embed_on: Vec<(String, u16)>,
extra_script_src: Vec<(String, u16)>,
dapps_path: PathBuf,
extra_dapps: Vec<PathBuf>,
dapps_domain: &str,
@@ -184,7 +185,7 @@ impl Middleware {
web_proxy_tokens: Arc<WebProxyTokens>,
fetch: F,
) -> Self {
let embeddable = as_embeddable(ui_address, extra_embed_on, dapps_domain);
let embeddable = as_embeddable(ui_address, extra_embed_on, extra_script_src, dapps_domain);
let content_fetcher = Arc::new(apps::fetcher::ContentFetcher::new(
hash_fetch::urlhint::URLHintContract::new(registrar),
sync_status.clone(),
@@ -203,7 +204,7 @@ impl Middleware {
let special = {
let mut special = special_endpoints(
ntp_server,
ntp_servers,
pool,
content_fetcher.clone(),
remote.clone(),
@@ -237,8 +238,8 @@ impl http::RequestMiddleware for Middleware {
}
}
fn special_endpoints(
ntp_server: &str,
fn special_endpoints<T: AsRef<str>>(
ntp_servers: &[T],
pool: CpuPool,
content_fetcher: Arc<apps::fetcher::Fetcher>,
remote: Remote,
@@ -250,7 +251,7 @@ fn special_endpoints(
special.insert(router::SpecialEndpoint::Api, Some(api::RestApi::new(
content_fetcher,
sync_status,
api::TimeChecker::new(ntp_server.into(), pool),
api::TimeChecker::new(ntp_servers, pool),
remote,
)));
special
@@ -263,12 +264,14 @@ fn address(host: &str, port: u16) -> String {
fn as_embeddable(
ui_address: Option<(String, u16)>,
extra_embed_on: Vec<(String, u16)>,
extra_script_src: Vec<(String, u16)>,
dapps_domain: &str,
) -> Option<ParentFrameSettings> {
ui_address.map(|(host, port)| ParentFrameSettings {
host,
port,
extra_embed_on,
extra_script_src,
dapps_domain: dapps_domain.to_owned(),
})
}
@@ -289,8 +292,10 @@ pub struct ParentFrameSettings {
pub host: String,
/// Port
pub port: u16,
/// Additional pages the pages can be embedded on.
/// Additional URLs the dapps can be embedded on.
pub extra_embed_on: Vec<(String, u16)>,
/// Additional URLs the dapp scripts can be loaded from.
pub extra_script_src: Vec<(String, u16)>,
/// Dapps Domain (web3.site)
pub dapps_domain: String,
}

View File

@@ -255,11 +255,12 @@ impl Server {
fetch: F,
) -> Result<Server, http::Error> {
let middleware = Middleware::dapps(
"pool.ntp.org:123",
&["0.pool.ntp.org:123".into(), "1.pool.ntp.org:123".into()],
CpuPool::new(4),
remote,
signer_address,
vec![],
vec![],
dapps_path,
extra_dapps,
DAPPS_DOMAIN.into(),