Using multiple NTP servers (#6173)
* Small improvements to time estimation. * Allow multiple NTP servers to be used. * Removing boxing. * Be nice. * Be nicer. * Update list of servers and add reference.
This commit is contained in:
committed by
Marek Kotewicz
parent
72fa6a79a2
commit
e93466c897
@@ -21,7 +21,7 @@ use hyper::method::Method;
|
||||
use hyper::status::StatusCode;
|
||||
|
||||
use api::{response, types};
|
||||
use api::time::TimeChecker;
|
||||
use api::time::{TimeChecker, MAX_DRIFT};
|
||||
use apps::fetcher::Fetcher;
|
||||
use handlers::{self, extract_url};
|
||||
use endpoint::{Endpoint, Handler, EndpointPath};
|
||||
@@ -122,7 +122,6 @@ impl RestApiRouter {
|
||||
|
||||
// Check time
|
||||
let time = {
|
||||
const MAX_DRIFT: i64 = 500;
|
||||
let (status, message, details) = match time {
|
||||
Ok(Ok(diff)) if diff < MAX_DRIFT && diff > -MAX_DRIFT => {
|
||||
(HealthStatus::Ok, "".into(), diff)
|
||||
|
||||
@@ -33,11 +33,13 @@
|
||||
|
||||
use std::io;
|
||||
use std::{fmt, mem, time};
|
||||
use std::sync::Arc;
|
||||
use std::collections::VecDeque;
|
||||
use std::sync::atomic::{self, AtomicUsize};
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::{self, Future, BoxFuture};
|
||||
use futures_cpupool::CpuPool;
|
||||
use futures::future::{self, IntoFuture};
|
||||
use futures_cpupool::{CpuPool, CpuFuture};
|
||||
use ntp;
|
||||
use time::{Duration, Timespec};
|
||||
use util::RwLock;
|
||||
@@ -45,6 +47,8 @@ use util::RwLock;
|
||||
/// Time checker error.
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub enum Error {
|
||||
/// No servers are currently available for a query.
|
||||
NoServersAvailable,
|
||||
/// There was an error when trying to reach the NTP server.
|
||||
Ntp(String),
|
||||
/// IO error when reading NTP response.
|
||||
@@ -56,6 +60,7 @@ impl fmt::Display for Error {
|
||||
use self::Error::*;
|
||||
|
||||
match *self {
|
||||
NoServersAvailable => write!(fmt, "No NTP servers available"),
|
||||
Ntp(ref err) => write!(fmt, "NTP error: {}", err),
|
||||
Io(ref err) => write!(fmt, "Connection Error: {}", err),
|
||||
}
|
||||
@@ -72,58 +77,123 @@ impl From<ntp::errors::Error> for Error {
|
||||
|
||||
/// NTP time drift checker.
|
||||
pub trait Ntp {
|
||||
/// Returned Future.
|
||||
type Future: IntoFuture<Item=Duration, Error=Error>;
|
||||
|
||||
/// Returns the current time drift.
|
||||
fn drift(&self) -> BoxFuture<Duration, Error>;
|
||||
fn drift(&self) -> Self::Future;
|
||||
}
|
||||
|
||||
const SERVER_MAX_POLL_INTERVAL_SECS: u64 = 60;
|
||||
#[derive(Debug)]
|
||||
struct Server {
|
||||
pub address: String,
|
||||
next_call: RwLock<time::Instant>,
|
||||
failures: AtomicUsize,
|
||||
}
|
||||
|
||||
impl Server {
|
||||
pub fn is_available(&self) -> bool {
|
||||
*self.next_call.read() < time::Instant::now()
|
||||
}
|
||||
|
||||
pub fn report_success(&self) {
|
||||
self.failures.store(0, atomic::Ordering::SeqCst);
|
||||
self.update_next_call(1)
|
||||
}
|
||||
|
||||
pub fn report_failure(&self) {
|
||||
let errors = self.failures.fetch_add(1, atomic::Ordering::SeqCst);
|
||||
self.update_next_call(1 << errors)
|
||||
}
|
||||
|
||||
fn update_next_call(&self, delay: usize) {
|
||||
*self.next_call.write() = time::Instant::now() + time::Duration::from_secs(delay as u64 * SERVER_MAX_POLL_INTERVAL_SECS);
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsRef<str>> From<T> for Server {
|
||||
fn from(t: T) -> Self {
|
||||
Server {
|
||||
address: t.as_ref().to_owned(),
|
||||
next_call: RwLock::new(time::Instant::now()),
|
||||
failures: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// NTP client using the SNTP algorithm for calculating drift.
|
||||
#[derive(Clone)]
|
||||
pub struct SimpleNtp {
|
||||
address: Arc<String>,
|
||||
addresses: Vec<Arc<Server>>,
|
||||
pool: CpuPool,
|
||||
}
|
||||
|
||||
impl fmt::Debug for SimpleNtp {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "Ntp {{ address: {} }}", self.address)
|
||||
f
|
||||
.debug_struct("SimpleNtp")
|
||||
.field("addresses", &self.addresses)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl SimpleNtp {
|
||||
fn new(address: &str, pool: CpuPool) -> SimpleNtp {
|
||||
fn new<T: AsRef<str>>(addresses: &[T], pool: CpuPool) -> SimpleNtp {
|
||||
SimpleNtp {
|
||||
address: Arc::new(address.to_owned()),
|
||||
addresses: addresses.iter().map(Server::from).map(Arc::new).collect(),
|
||||
pool: pool,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Ntp for SimpleNtp {
|
||||
fn drift(&self) -> BoxFuture<Duration, Error> {
|
||||
let address = self.address.clone();
|
||||
if &*address == "none" {
|
||||
return futures::future::err(Error::Ntp("NTP server is not provided.".into())).boxed();
|
||||
}
|
||||
type Future = future::Either<
|
||||
CpuFuture<Duration, Error>,
|
||||
future::FutureResult<Duration, Error>,
|
||||
>;
|
||||
|
||||
self.pool.spawn_fn(move || {
|
||||
let packet = ntp::request(&*address)?;
|
||||
let dest_time = ::time::now_utc().to_timespec();
|
||||
let orig_time = Timespec::from(packet.orig_time);
|
||||
let recv_time = Timespec::from(packet.recv_time);
|
||||
let transmit_time = Timespec::from(packet.transmit_time);
|
||||
fn drift(&self) -> Self::Future {
|
||||
use self::future::Either::{A, B};
|
||||
|
||||
let drift = ((recv_time - orig_time) + (transmit_time - dest_time)) / 2;
|
||||
let server = self.addresses.iter().find(|server| server.is_available());
|
||||
server.map(|server| {
|
||||
let server = server.clone();
|
||||
A(self.pool.spawn_fn(move || {
|
||||
debug!(target: "dapps", "Fetching time from {}.", server.address);
|
||||
|
||||
Ok(drift)
|
||||
}).boxed()
|
||||
match ntp::request(&server.address) {
|
||||
Ok(packet) => {
|
||||
let dest_time = ::time::now_utc().to_timespec();
|
||||
let orig_time = Timespec::from(packet.orig_time);
|
||||
let recv_time = Timespec::from(packet.recv_time);
|
||||
let transmit_time = Timespec::from(packet.transmit_time);
|
||||
|
||||
let drift = ((recv_time - orig_time) + (transmit_time - dest_time)) / 2;
|
||||
|
||||
server.report_success();
|
||||
Ok(drift)
|
||||
},
|
||||
Err(err) => {
|
||||
server.report_failure();
|
||||
Err(err.into())
|
||||
},
|
||||
}
|
||||
}))
|
||||
}).unwrap_or_else(|| B(future::err(Error::NoServersAvailable)))
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE In a positive scenario first results will be seen after:
|
||||
// MAX_RESULTS * UPDATE_TIMEOUT_OK_SECS seconds.
|
||||
const MAX_RESULTS: usize = 7;
|
||||
const UPDATE_TIMEOUT_OK_SECS: u64 = 30;
|
||||
const UPDATE_TIMEOUT_ERR_SECS: u64 = 2;
|
||||
// MAX_RESULTS * UPDATE_TIMEOUT_INCOMPLETE_SECS seconds.
|
||||
const MAX_RESULTS: usize = 4;
|
||||
const UPDATE_TIMEOUT_OK_SECS: u64 = 6 * 60 * 60;
|
||||
const UPDATE_TIMEOUT_WARN_SECS: u64 = 15 * 60;
|
||||
const UPDATE_TIMEOUT_ERR_SECS: u64 = 60;
|
||||
const UPDATE_TIMEOUT_INCOMPLETE_SECS: u64 = 10;
|
||||
|
||||
/// Maximal valid time drift.
|
||||
pub const MAX_DRIFT: i64 = 500;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
/// A time checker.
|
||||
@@ -134,13 +204,13 @@ pub struct TimeChecker<N: Ntp = SimpleNtp> {
|
||||
|
||||
impl TimeChecker<SimpleNtp> {
|
||||
/// Creates new time checker given the NTP server address.
|
||||
pub fn new(ntp_address: String, pool: CpuPool) -> Self {
|
||||
pub fn new<T: AsRef<str>>(ntp_addresses: &[T], pool: CpuPool) -> Self {
|
||||
let last_result = Arc::new(RwLock::new(
|
||||
// Assume everything is ok at the very beginning.
|
||||
(time::Instant::now(), vec![Ok(0)].into())
|
||||
));
|
||||
|
||||
let ntp = SimpleNtp::new(&ntp_address, pool);
|
||||
let ntp = SimpleNtp::new(ntp_addresses, pool);
|
||||
|
||||
TimeChecker {
|
||||
ntp,
|
||||
@@ -149,22 +219,34 @@ impl TimeChecker<SimpleNtp> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<N: Ntp> TimeChecker<N> {
|
||||
impl<N: Ntp> TimeChecker<N> where <N::Future as IntoFuture>::Future: Send + 'static {
|
||||
/// Updates the time
|
||||
pub fn update(&self) -> BoxFuture<i64, Error> {
|
||||
trace!(target: "dapps", "Updating time from NTP.");
|
||||
let last_result = self.last_result.clone();
|
||||
self.ntp.drift().then(move |res| {
|
||||
self.ntp.drift().into_future().then(move |res| {
|
||||
let res = res.map(|d| d.num_milliseconds());
|
||||
|
||||
if let Err(Error::NoServersAvailable) = res {
|
||||
debug!(target: "dapps", "No NTP servers available. Selecting an older result.");
|
||||
return select_result(last_result.read().1.iter());
|
||||
}
|
||||
|
||||
// Update the results.
|
||||
let mut results = mem::replace(&mut last_result.write().1, VecDeque::new());
|
||||
let has_all_results = results.len() >= MAX_RESULTS;
|
||||
let valid_till = time::Instant::now() + time::Duration::from_secs(
|
||||
if res.is_ok() && results.len() == MAX_RESULTS {
|
||||
UPDATE_TIMEOUT_OK_SECS
|
||||
} else {
|
||||
UPDATE_TIMEOUT_ERR_SECS
|
||||
match res {
|
||||
Ok(time) if has_all_results && time < MAX_DRIFT => UPDATE_TIMEOUT_OK_SECS,
|
||||
Ok(_) if has_all_results => UPDATE_TIMEOUT_WARN_SECS,
|
||||
Err(_) if has_all_results => UPDATE_TIMEOUT_ERR_SECS,
|
||||
_ => UPDATE_TIMEOUT_INCOMPLETE_SECS,
|
||||
}
|
||||
);
|
||||
|
||||
trace!(target: "dapps", "New time drift received: {:?}", res);
|
||||
// Push the result.
|
||||
results.push_back(res.map(|d| d.num_milliseconds()));
|
||||
results.push_back(res);
|
||||
while results.len() > MAX_RESULTS {
|
||||
results.pop_front();
|
||||
}
|
||||
@@ -209,7 +291,7 @@ mod tests {
|
||||
use std::cell::{Cell, RefCell};
|
||||
use std::time::Instant;
|
||||
use time::Duration;
|
||||
use futures::{self, BoxFuture, Future};
|
||||
use futures::{future, Future};
|
||||
use super::{Ntp, TimeChecker, Error};
|
||||
use util::RwLock;
|
||||
|
||||
@@ -224,9 +306,11 @@ mod tests {
|
||||
}
|
||||
|
||||
impl Ntp for FakeNtp {
|
||||
fn drift(&self) -> BoxFuture<Duration, Error> {
|
||||
type Future = future::FutureResult<Duration, Error>;
|
||||
|
||||
fn drift(&self) -> Self::Future {
|
||||
self.1.set(self.1.get() + 1);
|
||||
futures::future::ok(self.0.borrow_mut().pop().expect("Unexpected call to drift().")).boxed()
|
||||
future::ok(self.0.borrow_mut().pop().expect("Unexpected call to drift()."))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user