Node Health warnings (#5951)

* Health endpoint.

* Asynchronous health endpoint.

* Configure time api URL via CLI.

* Tests for TimeChecker.

* Health indication on Status page.

* Adding status indication to tab titles.

* Add status to ParityBar.

* Fixing lints.

* Add health status on SyncWarning.

* Fix health URL for embed.

* Nicer messages.

* Fix tests.

* Fixing JS tests.

* NTP time sync (#5956)

* use NTP to check time drift

* update time module documentation

* replace time_api flag with ntp_server

* fix TimeChecker tests

* fix ntp-server flag usage

* hide status tooltip if there's no message to show

* remove TimeProvider trait

* use Cell in FakeNtp test trait

* share fetch client and ntp client cpu pool

* Add documentation to public method.

* Removing peer count from status.

* Remove unknown upgrade status.

* Send two time requests at the time.

* Revert "Send two time requests at the time."

This reverts commit f7b754b1155076a5a5d8fdafa022801fae324452.

* Defer reporting time synchronization issues.

* Fix tests.

* Fix linting.
This commit is contained in:
Tomasz Drwięga
2017-07-11 12:23:46 +02:00
committed by Gav Wood
parent 7fb46bff06
commit 4936e99f30
48 changed files with 1296 additions and 125 deletions

View File

@@ -18,23 +18,36 @@ use std::sync::Arc;
use hyper::{server, net, Decoder, Encoder, Next, Control};
use hyper::method::Method;
use hyper::status::StatusCode;
use api::types::ApiError;
use api::response;
use api::{response, types};
use api::time::TimeChecker;
use apps::fetcher::Fetcher;
use handlers::extract_url;
use handlers::{self, extract_url};
use endpoint::{Endpoint, Handler, EndpointPath};
use parity_reactor::Remote;
use {SyncStatus};
#[derive(Clone)]
pub struct RestApi {
fetcher: Arc<Fetcher>,
sync_status: Arc<SyncStatus>,
time: TimeChecker,
remote: Remote,
}
impl RestApi {
pub fn new(fetcher: Arc<Fetcher>) -> Box<Endpoint> {
pub fn new(
fetcher: Arc<Fetcher>,
sync_status: Arc<SyncStatus>,
time: TimeChecker,
remote: Remote,
) -> Box<Endpoint> {
Box::new(RestApi {
fetcher: fetcher,
fetcher,
sync_status,
time,
remote,
})
}
}
@@ -58,11 +71,11 @@ impl RestApiRouter {
path: Some(path),
control: Some(control),
api: api,
handler: response::as_json_error(&ApiError {
handler: Box::new(response::as_json_error(StatusCode::NotFound, &types::ApiError {
code: "404".into(),
title: "Not Found".into(),
detail: "Resource you requested has not been found.".into(),
}),
})),
}
}
@@ -75,6 +88,78 @@ impl RestApiRouter {
_ => None
}
}
fn health(&self, control: Control) -> Box<Handler> {
use self::types::{HealthInfo, HealthStatus, Health};
trace!(target: "dapps", "Checking node health.");
// Check timediff
let sync_status = self.api.sync_status.clone();
let map = move |time| {
// Check peers
let peers = {
let (connected, max) = sync_status.peers();
let (status, message) = match connected {
0 => {
(HealthStatus::Bad, "You are not connected to any peers. There is most likely some network issue. Fix connectivity.".into())
},
1 => (HealthStatus::NeedsAttention, "You are connected to only one peer. Your node might not be reliable. Check your network connection.".into()),
_ => (HealthStatus::Ok, "".into()),
};
HealthInfo { status, message, details: (connected, max) }
};
// Check sync
let sync = {
let is_syncing = sync_status.is_major_importing();
let (status, message) = if is_syncing {
(HealthStatus::NeedsAttention, "Your node is still syncing, the values you see might be outdated. Wait until it's fully synced.".into())
} else {
(HealthStatus::Ok, "".into())
};
HealthInfo { status, message, details: is_syncing }
};
// Check time
let time = {
const MAX_DRIFT: i64 = 500;
let (status, message, details) = match time {
Ok(Ok(diff)) if diff < MAX_DRIFT && diff > -MAX_DRIFT => {
(HealthStatus::Ok, "".into(), diff)
},
Ok(Ok(diff)) => {
(HealthStatus::Bad, format!(
"Your clock is not in sync. Detected difference is too big for the protocol to work: {}ms. Synchronize your clock.",
diff,
), diff)
},
Ok(Err(err)) => {
(HealthStatus::NeedsAttention, format!(
"Unable to reach time API: {}. Make sure that your clock is synchronized.",
err,
), 0)
},
Err(_) => {
(HealthStatus::NeedsAttention, "Time API request timed out. Make sure that the clock is synchronized.".into(), 0)
},
};
HealthInfo { status, message, details, }
};
let status = if [&peers.status, &sync.status, &time.status].iter().any(|x| *x != &HealthStatus::Ok) {
StatusCode::PreconditionFailed // HTTP 412
} else {
StatusCode::Ok // HTTP 200
};
response::as_json(status, &Health { peers, sync, time })
};
let time = self.api.time.time_drift();
let remote = self.api.remote.clone();
Box::new(handlers::AsyncHandler::new(time, map, remote, control))
}
}
impl server::Handler<net::HttpStream> for RestApiRouter {
@@ -103,6 +188,7 @@ impl server::Handler<net::HttpStream> for RestApiRouter {
let handler = endpoint.and_then(|v| match v {
"ping" => Some(response::ping()),
"health" => Some(self.health(control)),
"content" => self.resolve_content(hash, path, control),
_ => None
});

View File

@@ -18,6 +18,8 @@
mod api;
mod response;
mod time;
mod types;
pub use self::api::RestApi;
pub use self::time::TimeChecker;

View File

@@ -16,6 +16,8 @@
use serde::Serialize;
use serde_json;
use hyper::status::StatusCode;
use endpoint::Handler;
use handlers::{ContentHandler, EchoHandler};
@@ -23,10 +25,16 @@ pub fn empty() -> Box<Handler> {
Box::new(ContentHandler::ok("".into(), mime!(Text/Plain)))
}
pub fn as_json_error<T: Serialize>(val: &T) -> Box<Handler> {
pub fn as_json<T: Serialize>(status: StatusCode, val: &T) -> ContentHandler {
let json = serde_json::to_string(val)
.expect("serialization to string is infallible; qed");
Box::new(ContentHandler::not_found(json, mime!(Application/Json)))
ContentHandler::new(status, json, mime!(Application/Json))
}
pub fn as_json_error<T: Serialize>(status: StatusCode, val: &T) -> ContentHandler {
let json = serde_json::to_string(val)
.expect("serialization to string is infallible; qed");
ContentHandler::new(status, json, mime!(Application/Json))
}
pub fn ping() -> Box<Handler> {

264
dapps/src/api/time.rs Normal file
View File

@@ -0,0 +1,264 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Periodically checks node's time drift using [SNTP](https://tools.ietf.org/html/rfc1769).
//!
//! An NTP packet is sent to the server with a local timestamp, the server then completes the packet, yielding the
//! following timestamps:
//!
//! Timestamp Name ID When Generated
//! ------------------------------------------------------------
//! Originate Timestamp T1 time request sent by client
//! Receive Timestamp T2 time request received at server
//! Transmit Timestamp T3 time reply sent by server
//! Destination Timestamp T4 time reply received at client
//!
//! The drift is defined as:
//!
//! drift = ((T2 - T1) + (T3 - T4)) / 2.
//!
use std::io;
use std::{fmt, mem, time};
use std::collections::VecDeque;
use futures::{self, Future, BoxFuture};
use futures_cpupool::CpuPool;
use ntp;
use time::{Duration, Timespec};
use util::{Arc, RwLock};
/// Time checker error.
#[derive(Debug, Clone, PartialEq)]
pub enum Error {
/// There was an error when trying to reach the NTP server.
Ntp(String),
/// IO error when reading NTP response.
Io(String),
}
impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
use self::Error::*;
match *self {
Ntp(ref err) => write!(fmt, "NTP error: {}", err),
Io(ref err) => write!(fmt, "Connection Error: {}", err),
}
}
}
impl From<io::Error> for Error {
fn from(err: io::Error) -> Self { Error::Io(format!("{}", err)) }
}
impl From<ntp::errors::Error> for Error {
fn from(err: ntp::errors::Error) -> Self { Error::Ntp(format!("{}", err)) }
}
/// NTP time drift checker.
pub trait Ntp {
/// Returns the current time drift.
fn drift(&self) -> BoxFuture<Duration, Error>;
}
/// NTP client using the SNTP algorithm for calculating drift.
#[derive(Clone)]
pub struct SimpleNtp {
address: Arc<String>,
pool: CpuPool,
}
impl fmt::Debug for SimpleNtp {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Ntp {{ address: {} }}", self.address)
}
}
impl SimpleNtp {
fn new(address: &str, pool: CpuPool) -> SimpleNtp {
SimpleNtp {
address: Arc::new(address.to_owned()),
pool: pool,
}
}
}
impl Ntp for SimpleNtp {
fn drift(&self) -> BoxFuture<Duration, Error> {
let address = self.address.clone();
self.pool.spawn_fn(move || {
let packet = ntp::request(&*address)?;
let dest_time = ::time::now_utc().to_timespec();
let orig_time = Timespec::from(packet.orig_time);
let recv_time = Timespec::from(packet.recv_time);
let transmit_time = Timespec::from(packet.transmit_time);
let drift = ((recv_time - orig_time) + (transmit_time - dest_time)) / 2;
Ok(drift)
}).boxed()
}
}
const MAX_RESULTS: usize = 4;
const UPDATE_TIMEOUT_OK_SECS: u64 = 30;
const UPDATE_TIMEOUT_ERR_SECS: u64 = 2;
#[derive(Debug, Clone)]
/// A time checker.
pub struct TimeChecker<N: Ntp = SimpleNtp> {
ntp: N,
last_result: Arc<RwLock<(time::Instant, VecDeque<Result<i64, Error>>)>>,
}
impl TimeChecker<SimpleNtp> {
/// Creates new time checker given the NTP server address.
pub fn new(ntp_address: String, pool: CpuPool) -> Self {
let last_result = Arc::new(RwLock::new(
// Assume everything is ok at the very beginning.
(time::Instant::now(), vec![Ok(0)].into())
));
let ntp = SimpleNtp::new(&ntp_address, pool);
TimeChecker {
ntp,
last_result,
}
}
}
impl<N: Ntp> TimeChecker<N> {
/// Updates the time
pub fn update(&self) -> BoxFuture<i64, Error> {
let last_result = self.last_result.clone();
self.ntp.drift().then(move |res| {
let mut results = mem::replace(&mut last_result.write().1, VecDeque::new());
let valid_till = time::Instant::now() + time::Duration::from_secs(
if res.is_ok() && results.len() == MAX_RESULTS {
UPDATE_TIMEOUT_OK_SECS
} else {
UPDATE_TIMEOUT_ERR_SECS
}
);
// Push the result.
results.push_back(res.map(|d| d.num_milliseconds()));
while results.len() > MAX_RESULTS {
results.pop_front();
}
// Select a response and update last result.
let res = select_result(results.iter());
*last_result.write() = (valid_till, results);
res
}).boxed()
}
/// Returns a current time drift or error if last request to NTP server failed.
pub fn time_drift(&self) -> BoxFuture<i64, Error> {
// return cached result
{
let res = self.last_result.read();
if res.0 > time::Instant::now() {
return futures::done(select_result(res.1.iter())).boxed();
}
}
// or update and return result
self.update()
}
}
fn select_result<'a, T: Iterator<Item=&'a Result<i64, Error>>>(results: T) -> Result<i64, Error> {
let mut min = None;
for res in results {
min = Some(match (min.take(), res) {
(Some(Ok(min)), &Ok(ref new)) => Ok(::std::cmp::min(min, *new)),
(Some(Ok(old)), &Err(_)) => Ok(old),
(_, ref new) => (*new).clone(),
})
}
min.unwrap_or_else(|| Err(Error::Ntp("NTP server unavailable.".into())))
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::cell::{Cell, RefCell};
use std::time::Instant;
use time::Duration;
use futures::{self, BoxFuture, Future};
use super::{Ntp, TimeChecker, Error};
use util::RwLock;
#[derive(Clone)]
struct FakeNtp(RefCell<Vec<Duration>>, Cell<u64>);
impl FakeNtp {
fn new() -> FakeNtp {
FakeNtp(
RefCell::new(vec![Duration::milliseconds(150)]),
Cell::new(0))
}
}
impl Ntp for FakeNtp {
fn drift(&self) -> BoxFuture<Duration, Error> {
self.1.set(self.1.get() + 1);
futures::future::ok(self.0.borrow_mut().pop().expect("Unexpected call to drift().")).boxed()
}
}
fn time_checker() -> TimeChecker<FakeNtp> {
let last_result = Arc::new(RwLock::new(
(Instant::now(), vec![Err(Error::Ntp("NTP server unavailable.".into()))].into())
));
TimeChecker {
ntp: FakeNtp::new(),
last_result: last_result,
}
}
#[test]
fn should_fetch_time_on_start() {
// given
let time = time_checker();
// when
let diff = time.time_drift().wait().unwrap();
// then
assert_eq!(diff, 150);
assert_eq!(time.ntp.1.get(), 1);
}
#[test]
fn should_not_fetch_twice_if_timeout_has_not_passed() {
// given
let time = time_checker();
// when
let diff1 = time.time_drift().wait().unwrap();
let diff2 = time.time_drift().wait().unwrap();
// then
assert_eq!(diff1, 150);
assert_eq!(diff2, 150);
assert_eq!(time.ntp.1.get(), 1);
}
}

View File

@@ -14,11 +14,54 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
/// A structure representing any error in REST API.
#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ApiError {
/// Error code.
pub code: String,
/// Human-readable error summary.
pub title: String,
/// More technical error details.
pub detail: String,
}
/// Health API endpoint status.
#[derive(Debug, PartialEq, Serialize)]
pub enum HealthStatus {
/// Everything's OK.
#[serde(rename = "ok")]
Ok,
/// Node health need attention
/// (the issue is not critical, but may need investigation)
#[serde(rename = "needsAttention")]
NeedsAttention,
/// There is something bad detected with the node.
#[serde(rename = "bad")]
Bad
}
/// Represents a single check in node health.
/// Cointains the status of that check and apropriate message and details.
#[derive(Debug, PartialEq, Serialize)]
#[serde(deny_unknown_fields)]
pub struct HealthInfo<T> {
/// Check status.
pub status: HealthStatus,
/// Human-readable message.
pub message: String,
/// Technical details of the check.
pub details: T,
}
/// Node Health status.
#[derive(Debug, PartialEq, Serialize)]
#[serde(deny_unknown_fields)]
pub struct Health {
/// Status of peers.
pub peers: HealthInfo<(usize, usize)>,
/// Sync status.
pub sync: HealthInfo<bool>,
/// Time diff info.
pub time: HealthInfo<i64>,
}