light client : failsafe crate (circuit breaker) (#9790)

* refactor(light) : N/W calls with `circuit breaker`

* fix(nits) : forgot to commit new files

* Add tests and change CLI args

* Address grumbles

* fix(failsafe-rs) : Santize input to prevent panics

* chore(failsafe) : bump failsafe, (parking_lot)

Bump failsafe to v0.3.0 to enable `parking_lot::Mutex` instead
`spin::Mutex`

* Remove `success_rate`

* feat(circuit_breaker logger)

* feat(CLI): separate CLI args request and response

* Fix tests

* Error response provide request kind

When a reponse `times-out` provide the actual request or requests that failed

* Update ethcore/light/src/on_demand/mod.rs

Co-Authored-By: niklasad1 <niklasadolfsson1@gmail.com>

* Update ethcore/light/src/on_demand/mod.rs

Co-Authored-By: niklasad1 <niklasadolfsson1@gmail.com>

* fix(grumbles): formatting nit

* fix(grumbles)

* Use second resolution on CLI args
* Use `consecutive failure policy` instead of `timeOverWindow`
* Add a couple of tests for `request_guard`

* fix(request_guard): off-by-one error, update tests
This commit is contained in:
Niklas Adolfsson 2018-12-05 10:36:53 +01:00 committed by Afri Schoedon
parent ec886ddefb
commit 7fb33796b1
13 changed files with 652 additions and 158 deletions

12
Cargo.lock generated
View File

@ -738,6 +738,7 @@ dependencies = [
"ethcore-network 1.12.0",
"ethcore-transaction 0.1.0",
"ethereum-types 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"failsafe 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"fastmap 0.1.0",
"futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)",
"hashdb 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1169,6 +1170,16 @@ dependencies = [
"vm 0.1.0",
]
[[package]]
name = "failsafe"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "failure"
version = "0.1.3"
@ -4268,6 +4279,7 @@ dependencies = [
"checksum ethbloom 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1a93a43ce2e9f09071449da36bfa7a1b20b950ee344b6904ff23de493b03b386"
"checksum ethereum-types 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "35b3c5a18bc5e73a32a110ac743ec04b02bbbcd3b71d3118d40a6113d509378a"
"checksum ethereum-types-serialize 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4ac59a21a9ce98e188f3dace9eb67a6c4a3c67ec7fbc7218cb827852679dc002"
"checksum failsafe 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ad3bf1642583ea2f1fa38a1e8546613a7488816941b33e5f0fccceac61879118"
"checksum failure 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "6dd377bcc1b1b7ce911967e3ec24fa19c3224394ec05b54aa7b083d498341ac7"
"checksum failure_derive 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "64c2d913fe8ed3b6c6518eedf4538255b989945c14c2a7d5cbff62a5e2120596"
"checksum fake-simd 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed"

View File

@ -21,6 +21,7 @@ hashdb = "0.3.0"
heapsize = "0.4"
vm = { path = "../vm" }
fastmap = { path = "../../util/fastmap" }
failsafe = { version = "0.3.0", default-features = false, features = ["parking_lot_mutex"] }
rlp = { version = "0.3.0", features = ["ethereum"] }
rlp_derive = { path = "../../util/rlp_derive" }
smallvec = "0.6"

View File

@ -62,6 +62,7 @@ extern crate ethereum_types;
extern crate ethcore;
extern crate hashdb;
extern crate heapsize;
extern crate failsafe;
extern crate futures;
extern crate itertools;
extern crate keccak_hasher;

View File

@ -19,44 +19,54 @@
//! will take the raw data received here and extract meaningful results from it.
use std::cmp;
use std::collections::{HashMap, BTreeSet};
use std::collections::HashMap;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
use ethcore::executed::{Executed, ExecutionError};
use futures::{Poll, Future, Async};
use futures::sync::oneshot::{self, Receiver};
use network::PeerId;
use parking_lot::{RwLock, Mutex};
use rand;
use std::time::{Duration, SystemTime};
use rand::Rng;
use net::{
self, Handler, PeerStatus, Status, Capabilities,
Handler, PeerStatus, Status, Capabilities,
Announcement, EventContext, BasicContext, ReqId,
};
use cache::Cache;
use request::{self as basic_request, Request as NetworkRequest};
use self::request::CheckedRequest;
pub use self::request::{Request, Response, HeaderRef};
pub use self::request::{Request, Response, HeaderRef, Error as ValidityError};
pub use self::request_guard::{RequestGuard, Error as RequestError};
pub use self::response_guard::{ResponseGuard, Error as ResponseGuardError, Inner as ResponseGuardInner};
pub use types::request::ResponseError;
#[cfg(test)]
mod tests;
pub mod request;
mod request_guard;
mod response_guard;
/// The result of execution
pub type ExecutionResult = Result<Executed, ExecutionError>;
/// The default number of retries for OnDemand queries to send to the other nodes
pub const DEFAULT_RETRY_COUNT: usize = 10;
/// The default time limit in milliseconds for inactive (no new peer to connect to) OnDemand queries (0 for unlimited)
pub const DEFAULT_QUERY_TIME_LIMIT: Duration = Duration::from_millis(10000);
const NULL_DURATION: Duration = Duration::from_secs(0);
/// The initial backoff interval for OnDemand queries
pub const DEFAULT_REQUEST_MIN_BACKOFF_DURATION: Duration = Duration::from_secs(10);
/// The maximum request interval for OnDemand queries
pub const DEFAULT_REQUEST_MAX_BACKOFF_DURATION: Duration = Duration::from_secs(100);
/// The default window length a response is evaluated
pub const DEFAULT_RESPONSE_TIME_TO_LIVE: Duration = Duration::from_secs(60);
/// The default number of maximum backoff iterations
pub const DEFAULT_MAX_REQUEST_BACKOFF_ROUNDS: usize = 10;
/// The default number failed request to be regarded as failure
pub const DEFAULT_NUM_CONSECUTIVE_FAILED_REQUESTS: usize = 1;
/// OnDemand related errors
pub mod error {
@ -69,22 +79,19 @@ pub mod error {
}
errors {
#[doc = "Max number of on-demand query attempts reached without result."]
MaxAttemptReach(query_index: usize) {
description("On-demand query limit reached")
display("On-demand query limit reached on query #{}", query_index)
#[doc = "Timeout bad response"]
BadResponse(err: String) {
description("Max response evaluation time exceeded")
display("{}", err)
}
#[doc = "No reply with current peer set, time out occured while waiting for new peers for additional query attempt."]
TimeoutOnNewPeers(query_index: usize, remaining_attempts: usize) {
description("Timeout for On-demand query")
display("Timeout for On-demand query; {} query attempts remain for query #{}", remaining_attempts, query_index)
#[doc = "OnDemand requests limit exceeded"]
RequestLimit {
description("OnDemand request maximum backoff iterations exceeded")
display("OnDemand request maximum backoff iterations exceeded")
}
}
}
}
// relevant peer info.
@ -113,7 +120,6 @@ impl Peer {
}
}
/// Either an array of responses or a single error.
type PendingResponse = self::error::Result<Vec<Response>>;
@ -124,10 +130,8 @@ struct Pending {
required_capabilities: Capabilities,
responses: Vec<Response>,
sender: oneshot::Sender<PendingResponse>,
base_query_index: usize,
remaining_query_count: usize,
query_id_history: BTreeSet<PeerId>,
inactive_time_limit: Option<SystemTime>,
request_guard: RequestGuard,
response_guard: ResponseGuard,
}
impl Pending {
@ -190,7 +194,7 @@ impl Pending {
fn try_complete(self) -> Option<Self> {
if self.requests.is_complete() {
if self.sender.send(Ok(self.responses)).is_err() {
debug!(target: "on_demand", "Dropped oneshot channel receiver on complete request at query #{}", self.query_id_history.len());
debug!(target: "on_demand", "Dropped oneshot channel receiver on request");
}
None
} else {
@ -227,20 +231,38 @@ impl Pending {
self.required_capabilities = capabilities;
}
// returning no reponse, it will result in an error.
// self is consumed on purpose.
fn no_response(self) {
trace!(target: "on_demand", "Dropping a pending query (no reply) at query #{}", self.query_id_history.len());
let err = self::error::ErrorKind::MaxAttemptReach(self.requests.num_answered());
// received too many empty responses, may be away to indicate a faulty request
fn bad_response(self, response_err: ResponseGuardError) {
let reqs: Vec<&str> = self.requests.requests().iter().map(|req| {
match req {
CheckedRequest::HeaderProof(_, _) => "HeaderProof",
CheckedRequest::HeaderByHash(_, _) => "HeaderByHash",
CheckedRequest::HeaderWithAncestors(_, _) => "HeaderWithAncestors",
CheckedRequest::TransactionIndex(_, _) => "TransactionIndex",
CheckedRequest::Receipts(_, _) => "Receipts",
CheckedRequest::Body(_, _) => "Body",
CheckedRequest::Account(_, _) => "Account",
CheckedRequest::Code(_, _) => "Code",
CheckedRequest::Execution(_, _) => "Execution",
CheckedRequest::Signal(_, _) => "Signal",
}
}).collect();
let err = format!("Bad response on {}: [ {} ]. {}",
if reqs.len() > 1 { "requests" } else { "request" },
reqs.join(", "),
response_err
);
let err = self::error::ErrorKind::BadResponse(err);
if self.sender.send(Err(err.into())).is_err() {
debug!(target: "on_demand", "Dropped oneshot channel receiver on no response");
}
}
// returning a peer discovery timeout during query attempts
fn time_out(self) {
trace!(target: "on_demand", "Dropping a pending query (no new peer time out) at query #{}", self.query_id_history.len());
let err = self::error::ErrorKind::TimeoutOnNewPeers(self.requests.num_answered(), self.query_id_history.len());
fn request_limit_reached(self) {
let err = self::error::ErrorKind::RequestLimit;
if self.sender.send(Err(err.into())).is_err() {
debug!(target: "on_demand", "Dropped oneshot channel receiver on time out");
}
@ -326,30 +348,68 @@ pub struct OnDemand {
in_transit: RwLock<HashMap<ReqId, Pending>>,
cache: Arc<Mutex<Cache>>,
no_immediate_dispatch: bool,
base_retry_count: usize,
query_inactive_time_limit: Option<Duration>,
response_time_window: Duration,
request_backoff_start: Duration,
request_backoff_max: Duration,
request_backoff_rounds_max: usize,
request_number_of_consecutive_errors: usize
}
impl OnDemand {
/// Create a new `OnDemand` service with the given cache.
pub fn new(cache: Arc<Mutex<Cache>>) -> Self {
OnDemand {
pub fn new(
cache: Arc<Mutex<Cache>>,
response_time_window: Duration,
request_backoff_start: Duration,
request_backoff_max: Duration,
request_backoff_rounds_max: usize,
request_number_of_consecutive_errors: usize,
) -> Self {
Self {
pending: RwLock::new(Vec::new()),
peers: RwLock::new(HashMap::new()),
in_transit: RwLock::new(HashMap::new()),
cache,
no_immediate_dispatch: false,
base_retry_count: DEFAULT_RETRY_COUNT,
query_inactive_time_limit: Some(DEFAULT_QUERY_TIME_LIMIT),
response_time_window: Self::sanitize_circuit_breaker_input(response_time_window, "Response time window"),
request_backoff_start: Self::sanitize_circuit_breaker_input(request_backoff_start, "Request initial backoff time window"),
request_backoff_max: Self::sanitize_circuit_breaker_input(request_backoff_max, "Request maximum backoff time window"),
request_backoff_rounds_max,
request_number_of_consecutive_errors,
}
}
fn sanitize_circuit_breaker_input(dur: Duration, name: &'static str) -> Duration {
if dur.as_secs() < 1 {
warn!(target: "on_demand",
"{} is too short must be at least 1 second, configuring it to 1 second", name);
Duration::from_secs(1)
} else {
dur
}
}
// make a test version: this doesn't dispatch pending requests
// until you trigger it manually.
#[cfg(test)]
fn new_test(cache: Arc<Mutex<Cache>>) -> Self {
let mut me = OnDemand::new(cache);
fn new_test(
cache: Arc<Mutex<Cache>>,
request_ttl: Duration,
request_backoff_start: Duration,
request_backoff_max: Duration,
request_backoff_rounds_max: usize,
request_number_of_consecutive_errors: usize,
) -> Self {
let mut me = OnDemand::new(
cache,
request_ttl,
request_backoff_start,
request_backoff_max,
request_backoff_rounds_max,
request_number_of_consecutive_errors,
);
me.no_immediate_dispatch = true;
me
@ -403,10 +463,13 @@ impl OnDemand {
required_capabilities: capabilities,
responses,
sender,
base_query_index: 0,
remaining_query_count: 0,
query_id_history: BTreeSet::new(),
inactive_time_limit: None,
request_guard: RequestGuard::new(
self.request_number_of_consecutive_errors as u32,
self.request_backoff_rounds_max,
self.request_backoff_start,
self.request_backoff_max,
),
response_guard: ResponseGuard::new(self.response_time_window),
});
Ok(receiver)
@ -435,82 +498,56 @@ impl OnDemand {
// dispatch pending requests, and discard those for which the corresponding
// receiver has been dropped.
fn dispatch_pending(&self, ctx: &BasicContext) {
if self.pending.read().is_empty() { return }
let mut pending = self.pending.write();
if self.pending.read().is_empty() {
return
}
debug!(target: "on_demand", "Attempting to dispatch {} pending requests", pending.len());
let mut pending = self.pending.write();
// iterate over all pending requests, and check them for hang-up.
// then, try and find a peer who can serve it.
let peers = self.peers.read();
*pending = ::std::mem::replace(&mut *pending, Vec::new()).into_iter()
*pending = ::std::mem::replace(&mut *pending, Vec::new())
.into_iter()
.filter(|pending| !pending.sender.is_canceled())
.filter_map(|mut pending| {
// the peer we dispatch to is chosen randomly
let num_peers = peers.len();
let history_len = pending.query_id_history.len();
let offset = if history_len == 0 {
pending.remaining_query_count = self.base_retry_count;
let rand = rand::random::<usize>();
pending.base_query_index = rand;
rand
} else {
pending.base_query_index + history_len
} % cmp::max(num_peers, 1);
let init_remaining_query_count = pending.remaining_query_count; // to fail in case of big reduction of nb of peers
for (peer_id, peer) in peers.iter().chain(peers.iter())
.skip(offset).take(num_peers) {
// TODO: see which requests can be answered by the cache?
if pending.remaining_query_count == 0 {
break
}
if pending.query_id_history.insert(peer_id.clone()) {
let num_peers = peers.len();
// The first peer to dispatch the request is chosen at random
let rand = rand::thread_rng().gen_range(0, cmp::max(1, num_peers));
for (peer_id, peer) in peers
.iter()
.cycle()
.skip(rand)
.take(num_peers)
{
if !peer.can_fulfill(&pending.required_capabilities) {
trace!(target: "on_demand", "Peer {} without required capabilities, skipping, {} remaining attempts", peer_id, pending.remaining_query_count);
trace!(target: "on_demand", "Peer {} without required capabilities, skipping", peer_id);
continue
}
pending.remaining_query_count -= 1;
pending.inactive_time_limit = None;
match ctx.request_from(*peer_id, pending.net_requests.clone()) {
Ok(req_id) => {
trace!(target: "on_demand", "Dispatched request {} to peer {}, {} remaining attempts", req_id, peer_id, pending.remaining_query_count);
if pending.request_guard.is_call_permitted() {
if let Ok(req_id) = ctx.request_from(*peer_id, pending.net_requests.clone()) {
self.in_transit.write().insert(req_id, pending);
return None
}
Err(net::Error::NoCredits) | Err(net::Error::NotServer) => {}
Err(e) => debug!(target: "on_demand", "Error dispatching request to peer: {}", e),
return None;
}
}
}
if pending.remaining_query_count == 0 {
pending.no_response();
// Register that the request round failed
if let RequestError::ReachedLimit = pending.request_guard.register_error() {
pending.request_limit_reached();
None
} else if init_remaining_query_count == pending.remaining_query_count {
if let Some(query_inactive_time_limit) = self.query_inactive_time_limit {
let now = SystemTime::now();
if let Some(inactive_time_limit) = pending.inactive_time_limit {
if now > inactive_time_limit {
pending.time_out();
return None
}
} else {
debug!(target: "on_demand", "No more peers to query, waiting for {} seconds until dropping query", query_inactive_time_limit.as_secs());
pending.inactive_time_limit = Some(now + query_inactive_time_limit);
}
}
Some(pending)
} else {
Some(pending)
}
})
.collect(); // `pending` now contains all requests we couldn't dispatch.
.collect(); // `pending` now contains all requests we couldn't dispatch
debug!(target: "on_demand", "Was unable to dispatch {} requests.", pending.len());
trace!(target: "on_demand", "Was unable to dispatch {} requests.", pending.len());
}
// submit a pending request set. attempts to answer from cache before
@ -521,26 +558,14 @@ impl OnDemand {
pending.answer_from_cache(&*self.cache);
if let Some(mut pending) = pending.try_complete() {
// update cached requests
pending.update_net_requests();
// push into `pending` buffer
self.pending.write().push(pending);
// try to dispatch
self.attempt_dispatch(ctx);
}
}
/// Set the retry count for a query.
pub fn default_retry_number(&mut self, nb_retry: usize) {
self.base_retry_count = nb_retry;
}
/// Set the time limit for a query.
pub fn query_inactive_time_limit(&mut self, inactive_time_limit: Duration) {
self.query_inactive_time_limit = if inactive_time_limit == NULL_DURATION {
None
} else {
Some(inactive_time_limit)
};
}
}
impl Handler for OnDemand {
@ -594,13 +619,11 @@ impl Handler for OnDemand {
};
if responses.is_empty() {
if pending.remaining_query_count == 0 {
pending.no_response();
// Max number of `bad` responses reached, drop the request
if let Err(e) = pending.response_guard.register_error(&ResponseError::Validity(ValidityError::Empty)) {
pending.bad_response(e);
return;
}
} else {
// do not keep query counter for others elements of this batch
pending.query_id_history.clear();
}
// for each incoming response
@ -613,7 +636,11 @@ impl Handler for OnDemand {
debug!(target: "on_demand", "Peer {} gave bad response: {:?}", peer, e);
ctx.disable_peer(peer);
break;
// Max number of `bad` responses reached, drop the request
if let Err(err) = pending.response_guard.register_error(&e) {
pending.bad_response(err);
return;
}
}
}

View File

@ -0,0 +1,123 @@
// Copyright 2015-2018 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use failsafe;
use std::time::Duration;
type RequestPolicy = failsafe::failure_policy::ConsecutiveFailures<failsafe::backoff::Exponential>;
/// Error wrapped on-top of `FailsafeError`
#[derive(Debug, PartialEq)]
pub enum Error {
/// The call is let through
LetThrough,
/// The call rejected by the guard
Rejected,
/// The request reached the maximum of backoff iterations
ReachedLimit,
}
/// Handle and register requests that can fail
#[derive(Debug)]
pub struct RequestGuard {
backoff_round: usize,
max_backoff_rounds: usize,
state: failsafe::StateMachine<RequestPolicy, ()>,
}
impl RequestGuard {
/// Constructor
pub fn new(
consecutive_failures: u32,
max_backoff_rounds: usize,
start_backoff: Duration,
max_backoff: Duration,
) -> Self {
let backoff = failsafe::backoff::exponential(start_backoff, max_backoff);
// success_rate not used because only errors are registered
let policy = failsafe::failure_policy::consecutive_failures(consecutive_failures as u32, backoff);
Self {
backoff_round: 0,
max_backoff_rounds,
state: failsafe::StateMachine::new(policy, ()),
}
}
/// Update the state after a `faulty` call
pub fn register_error(&mut self) -> Error {
trace!(target: "circuit_breaker", "RequestGuard; backoff_round: {}/{}, state {:?}",
self.backoff_round, self.max_backoff_rounds, self.state);
if self.backoff_round >= self.max_backoff_rounds {
Error::ReachedLimit
} else if self.state.is_call_permitted() {
self.state.on_error();
if self.state.is_call_permitted() {
Error::LetThrough
} else {
self.backoff_round += 1;
Error::Rejected
}
} else {
Error::Rejected
}
}
/// Poll the circuit breaker, to check if the call is permitted
pub fn is_call_permitted(&self) -> bool {
self.state.is_call_permitted()
}
}
#[cfg(test)]
mod tests {
use std::iter;
use std::time::Instant;
use super::*;
#[test]
fn one_consecutive_failure_with_10_backoffs() {
// 1, 2, 4, 5, 5 .... 5
let binary_exp_backoff = vec![1_u64, 2, 4].into_iter().chain(iter::repeat(5_u64).take(7));
let mut guard = RequestGuard::new(1, 10, Duration::from_secs(1), Duration::from_secs(5));
for backoff in binary_exp_backoff {
assert_eq!(guard.register_error(), Error::Rejected);
let now = Instant::now();
while now.elapsed() <= Duration::from_secs(backoff) {}
}
assert_eq!(guard.register_error(), Error::ReachedLimit, "10 backoffs should be error");
}
#[test]
fn five_consecutive_failures_with_3_backoffs() {
let mut guard = RequestGuard::new(5, 3, Duration::from_secs(1), Duration::from_secs(30));
// register five errors
for _ in 0..4 {
assert_eq!(guard.register_error(), Error::LetThrough);
}
let binary_exp_backoff = [1, 2, 4];
for backoff in &binary_exp_backoff {
assert_eq!(guard.register_error(), Error::Rejected);
let now = Instant::now();
while now.elapsed() <= Duration::from_secs(*backoff) {}
}
assert_eq!(guard.register_error(), Error::ReachedLimit, "3 backoffs should be an error");
}
}

View File

@ -0,0 +1,174 @@
// Copyright 2015-2018 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! ResponseGuard implementation.
//! It is responsible for the receiving end of `Pending Request` (see `OnDemand` module docs for more information)
//! The major functionality is the following:
//! 1) Register non-successful responses which will reported back if it fails
//! 2) A timeout mechanism that will wait for successful response at most t seconds
use std::time::{Duration, Instant};
use std::collections::HashMap;
use std::fmt;
use super::{ResponseError, ValidityError};
/// Response guard error type
#[derive(Debug, Eq, PartialEq)]
pub enum Error {
/// No majority, the error reason can't be determined
NoMajority(usize),
/// Majority, with the error reason
Majority(Inner, usize, usize),
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Error::Majority(err, majority, total) => {
write!(f, "Error cause was {:?}, (majority count: {} / total: {})",
err, majority, total)
}
Error::NoMajority(total) => {
write!(f, "Error cause couldn't be determined, the total number of responses was {}", total)
}
}
}
}
/// Dummy type to convert a generic type with no trait bounds
#[derive(Copy, Clone, Debug, Hash, Eq, PartialEq, PartialOrd, Ord)]
pub enum Inner {
/// Bad execution proof
BadProof,
/// RLP decoding
Decoder,
/// Empty response
EmptyResponse,
/// Wrong header sequence
HeaderByNumber,
/// Too few results
TooFewResults,
/// Too many results
TooManyResults,
/// Trie error
Trie,
/// Unresolved header
UnresolvedHeader,
/// No responses expected.
Unexpected,
/// Wrong hash
WrongHash,
/// Wrong Header sequence
WrongHeaderSequence,
/// Wrong response kind
WrongKind,
/// Wrong number
WrongNumber,
/// Wrong Trie Root
WrongTrieRoot,
}
/// Handle and register responses that can fail
#[derive(Debug)]
pub struct ResponseGuard {
request_start: Instant,
time_to_live: Duration,
responses: HashMap<Inner, usize>,
number_responses: usize,
}
impl ResponseGuard {
/// Constructor
pub fn new(time_to_live: Duration) -> Self {
Self {
request_start: Instant::now(),
time_to_live,
responses: HashMap::new(),
number_responses: 0,
}
}
fn into_reason(&self, err: &ResponseError<super::request::Error>) -> Inner {
match err {
ResponseError::Unexpected => Inner::Unexpected,
ResponseError::Validity(ValidityError::BadProof) => Inner::BadProof,
ResponseError::Validity(ValidityError::Decoder(_)) => Inner::Decoder,
ResponseError::Validity(ValidityError::Empty) => Inner::EmptyResponse,
ResponseError::Validity(ValidityError::HeaderByNumber) => Inner::HeaderByNumber,
ResponseError::Validity(ValidityError::TooFewResults(_, _)) => Inner::TooFewResults,
ResponseError::Validity(ValidityError::TooManyResults(_, _)) => Inner::TooManyResults,
ResponseError::Validity(ValidityError::Trie(_)) => Inner::Trie,
ResponseError::Validity(ValidityError::UnresolvedHeader(_)) => Inner::UnresolvedHeader,
ResponseError::Validity(ValidityError::WrongHash(_, _)) => Inner::WrongHash,
ResponseError::Validity(ValidityError::WrongHeaderSequence) => Inner::WrongHeaderSequence,
ResponseError::Validity(ValidityError::WrongKind) => Inner::WrongKind,
ResponseError::Validity(ValidityError::WrongNumber(_, _)) => Inner::WrongNumber,
ResponseError::Validity(ValidityError::WrongTrieRoot(_, _)) => Inner::WrongTrieRoot,
}
}
/// Update the state after a `faulty` call
pub fn register_error(&mut self, err: &ResponseError<super::request::Error>) -> Result<(), Error> {
let err = self.into_reason(err);
*self.responses.entry(err).or_insert(0) += 1;
self.number_responses = self.number_responses.saturating_add(1);
trace!(target: "circuit_breaker", "ResponseGuard: {:?}", self.responses);
// The request has exceeded its timeout
if self.request_start.elapsed() >= self.time_to_live {
let (&err, &max_count) = self.responses.iter().max_by_key(|(_k, v)| *v).expect("got at least one element; qed");
let majority = self.responses.values().filter(|v| **v == max_count).count() == 1;
if majority {
Err(Error::Majority(err, max_count, self.number_responses))
} else {
Err(Error::NoMajority(self.number_responses))
}
} else {
Ok(())
}
}
}
#[cfg(test)]
mod tests {
use std::thread;
use super::*;
#[test]
fn test_basic_by_majority() {
let mut guard = ResponseGuard::new(Duration::from_secs(5));
guard.register_error(&ResponseError::Validity(ValidityError::Empty)).unwrap();
guard.register_error(&ResponseError::Unexpected).unwrap();
guard.register_error(&ResponseError::Unexpected).unwrap();
guard.register_error(&ResponseError::Unexpected).unwrap();
thread::sleep(Duration::from_secs(5));
assert_eq!(guard.register_error(&ResponseError::Validity(ValidityError::WrongKind)), Err(Error::Majority(Inner::Unexpected, 3, 5)));
}
#[test]
fn test_no_majority() {
let mut guard = ResponseGuard::new(Duration::from_secs(5));
guard.register_error(&ResponseError::Validity(ValidityError::Empty)).unwrap();
guard.register_error(&ResponseError::Validity(ValidityError::Empty)).unwrap();
guard.register_error(&ResponseError::Unexpected).unwrap();
guard.register_error(&ResponseError::Unexpected).unwrap();
thread::sleep(Duration::from_secs(5));
assert_eq!(guard.register_error(&ResponseError::Validity(ValidityError::WrongKind)), Err(Error::NoMajority(5)));
}
}

View File

@ -23,10 +23,11 @@ use network::{PeerId, NodeId};
use net::*;
use ethereum_types::H256;
use parking_lot::Mutex;
use std::time::Duration;
use ::request::{self as basic_request, Response};
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::thread;
use super::{request, OnDemand, Peer, HeaderRef};
@ -36,6 +37,7 @@ enum Context {
WithPeer(PeerId),
RequestFrom(PeerId, ReqId),
Punish(PeerId),
FaultyRequest,
}
impl EventContext for Context {
@ -44,6 +46,7 @@ impl EventContext for Context {
Context::WithPeer(id)
| Context::RequestFrom(id, _)
| Context::Punish(id) => id,
| Context::FaultyRequest => 0,
_ => panic!("didn't expect to have peer queried."),
}
}
@ -60,6 +63,7 @@ impl BasicContext for Context {
fn request_from(&self, peer_id: PeerId, _: ::request::NetworkRequests) -> Result<ReqId, Error> {
match *self {
Context::RequestFrom(id, req_id) => if peer_id == id { Ok(req_id) } else { Err(Error::NoCredits) },
Context::FaultyRequest => Err(Error::NoCredits),
_ => panic!("didn't expect to have requests dispatched."),
}
}
@ -89,7 +93,17 @@ impl Harness {
fn create() -> Self {
let cache = Arc::new(Mutex::new(Cache::new(Default::default(), Duration::from_secs(60))));
Harness {
service: OnDemand::new_test(cache),
service: OnDemand::new_test(
cache,
// Response `time_to_live`
Duration::from_secs(5),
// Request start backoff
Duration::from_secs(1),
// Request max backoff
Duration::from_secs(20),
super::DEFAULT_MAX_REQUEST_BACKOFF_ROUNDS,
super::DEFAULT_NUM_CONSECUTIVE_FAILED_REQUESTS
)
}
}
@ -495,3 +509,90 @@ fn fill_from_cache() {
assert!(recv.wait().is_ok());
}
#[test]
fn request_without_response_should_backoff_and_then_be_dropped() {
let harness = Harness::create();
let peer_id = 0;
let req_id = ReqId(13);
harness.inject_peer(
peer_id,
Peer {
status: dummy_status(),
capabilities: dummy_capabilities(),
}
);
let binary_exp_backoff: Vec<u64> = vec![1, 2, 4, 8, 16, 20, 20, 20, 20, 20];
let _recv = harness.service.request_raw(
&Context::RequestFrom(peer_id, req_id),
vec![request::HeaderByHash(Header::default().encoded().hash().into()).into()],
).unwrap();
assert_eq!(harness.service.pending.read().len(), 1);
for backoff in &binary_exp_backoff {
harness.service.dispatch_pending(&Context::FaultyRequest);
assert_eq!(harness.service.pending.read().len(), 1, "Request should not be dropped");
let now = Instant::now();
while now.elapsed() < Duration::from_secs(*backoff) {}
}
harness.service.dispatch_pending(&Context::FaultyRequest);
assert_eq!(harness.service.pending.read().len(), 0, "Request exceeded the 10 backoff rounds should be dropped");
}
#[test]
fn empty_responses_exceeds_limit_should_be_dropped() {
let harness = Harness::create();
let peer_id = 0;
let req_id = ReqId(13);
harness.inject_peer(
peer_id,
Peer {
status: dummy_status(),
capabilities: dummy_capabilities(),
}
);
let _recv = harness.service.request_raw(
&Context::RequestFrom(peer_id, req_id),
vec![request::HeaderByHash(Header::default().encoded().hash().into()).into()],
).unwrap();
harness.service.dispatch_pending(&Context::RequestFrom(peer_id, req_id));
assert_eq!(harness.service.pending.read().len(), 0);
assert_eq!(harness.service.in_transit.read().len(), 1);
let now = Instant::now();
// Send `empty responses` in the current time window
// Use only half of the `time_window` because we can't be sure exactly
// when the window started and the clock accurancy
while now.elapsed() < harness.service.response_time_window / 2 {
harness.service.on_responses(
&Context::RequestFrom(13, req_id),
req_id,
&[]
);
assert!(harness.service.pending.read().len() != 0);
let pending = harness.service.pending.write().remove(0);
harness.service.in_transit.write().insert(req_id, pending);
}
// Make sure we passed the first `time window`
thread::sleep(Duration::from_secs(5));
// Now, response is in failure state but need another response to be `polled`
harness.service.on_responses(
&Context::RequestFrom(13, req_id),
req_id,
&[]
);
assert!(harness.service.in_transit.read().is_empty());
assert!(harness.service.pending.read().is_empty());
}

View File

@ -579,13 +579,25 @@ usage! {
"Specify CORS header for IPFS API responses. Special options: \"all\", \"none\".",
["Light Client Options"]
ARG arg_on_demand_retry_count: (Option<usize>) = None, or |c: &Config| c.light.as_ref()?.on_demand_retry_count,
"--on-demand-retry-count=[RETRIES]",
"Specify the query retry count.",
ARG arg_on_demand_response_time_window: (Option<u64>) = None, or |c: &Config| c.light.as_ref()?.on_demand_response_time_window,
"--on-demand-time-window=[S]",
"Specify the maximum time to wait for a successful response",
ARG arg_on_demand_inactive_time_limit: (Option<u64>) = None, or |c: &Config| c.light.as_ref()?.on_demand_inactive_time_limit,
"--on-demand-inactive-time-limit=[MS]",
"Specify light client query inactive time limit. O for no limit.",
ARG arg_on_demand_request_backoff_start: (Option<u64>) = None, or |c: &Config| c.light.as_ref()?.on_demand_request_backoff_start,
"--on-demand-start-backoff=[S]",
"Specify light client initial backoff time for a request",
ARG arg_on_demand_request_backoff_max: (Option<u64>) = None, or |c: &Config| c.light.as_ref()?.on_demand_request_backoff_max,
"--on-demand-end-backoff=[S]",
"Specify light client maximum backoff time for a request",
ARG arg_on_demand_request_backoff_rounds_max: (Option<usize>) = None, or |c: &Config| c.light.as_ref()?.on_demand_request_backoff_rounds_max,
"--on-demand-max-backoff-rounds=[TIMES]",
"Specify light client maximum number of backoff iterations for a request",
ARG arg_on_demand_request_consecutive_failures: (Option<usize>) = None, or |c: &Config| c.light.as_ref()?.on_demand_request_consecutive_failures,
"--on-demand-consecutive-failures=[TIMES]",
"Specify light client the number of failures for a request until it gets exponentially backed off",
["Secret Store Options"]
FLAG flag_no_secretstore: (bool) = false, or |c: &Config| c.secretstore.as_ref()?.disable.clone(),
@ -1402,8 +1414,11 @@ struct Whisper {
#[derive(Default, Debug, PartialEq, Deserialize)]
#[serde(deny_unknown_fields)]
struct Light {
on_demand_retry_count: Option<usize>,
on_demand_inactive_time_limit: Option<u64>,
on_demand_response_time_window: Option<u64>,
on_demand_request_backoff_start: Option<u64>,
on_demand_request_backoff_max: Option<u64>,
on_demand_request_backoff_rounds_max: Option<usize>,
on_demand_request_consecutive_failures: Option<usize>,
}
#[cfg(test)]
@ -1820,8 +1835,11 @@ mod tests {
arg_snapshot_threads: None,
// -- Light options.
arg_on_demand_retry_count: Some(15),
arg_on_demand_inactive_time_limit: Some(15000),
arg_on_demand_response_time_window: Some(2000),
arg_on_demand_request_backoff_start: Some(9000),
arg_on_demand_request_backoff_max: Some(15000),
arg_on_demand_request_backoff_rounds_max: Some(100),
arg_on_demand_request_consecutive_failures: Some(1),
// -- Whisper options.
flag_whisper: false,
@ -2075,8 +2093,11 @@ mod tests {
num_verifiers: None,
}),
light: Some(Light {
on_demand_retry_count: Some(12),
on_demand_inactive_time_limit: Some(20000),
on_demand_response_time_window: Some(2000),
on_demand_request_backoff_start: Some(9000),
on_demand_request_backoff_max: Some(15000),
on_demand_request_backoff_rounds_max: Some(10),
on_demand_request_consecutive_failures: Some(1),
}),
snapshots: Some(Snapshots {
disable_periodic: Some(true),

View File

@ -157,8 +157,11 @@ scale_verifiers = true
num_verifiers = 6
[light]
on_demand_retry_count = 15
on_demand_inactive_time_limit = 15000
on_demand_response_time_window = 2000
on_demand_request_backoff_start = 9000
on_demand_request_backoff_max = 15000
on_demand_request_backoff_rounds_max = 100
on_demand_request_consecutive_failures = 1
[snapshots]
disable_periodic = false

View File

@ -71,8 +71,11 @@ fat_db = "off"
scale_verifiers = false
[light]
on_demand_retry_count = 12
on_demand_inactive_time_limit = 20000
on_demand_response_time_window = 2000
on_demand_request_backoff_start = 9000
on_demand_request_backoff_max = 15000
on_demand_request_backoff_rounds_max = 10
on_demand_request_consecutive_failures = 1
[snapshots]
disable_periodic = true

View File

@ -397,8 +397,11 @@ impl Configuration {
whisper: whisper_config,
no_hardcoded_sync: self.args.flag_no_hardcoded_sync,
max_round_blocks_to_import: self.args.arg_max_round_blocks_to_import,
on_demand_retry_count: self.args.arg_on_demand_retry_count,
on_demand_inactive_time_limit: self.args.arg_on_demand_inactive_time_limit,
on_demand_response_time_window: self.args.arg_on_demand_response_time_window,
on_demand_request_backoff_start: self.args.arg_on_demand_request_backoff_start,
on_demand_request_backoff_max: self.args.arg_on_demand_request_backoff_max,
on_demand_request_backoff_rounds_max: self.args.arg_on_demand_request_backoff_rounds_max,
on_demand_request_consecutive_failures: self.args.arg_on_demand_request_consecutive_failures,
};
Cmd::Run(run_cmd)
};
@ -1449,8 +1452,11 @@ mod tests {
no_persistent_txqueue: false,
whisper: Default::default(),
max_round_blocks_to_import: 12,
on_demand_retry_count: None,
on_demand_inactive_time_limit: None,
on_demand_response_time_window: None,
on_demand_request_backoff_start: None,
on_demand_request_backoff_max: None,
on_demand_request_backoff_rounds_max: None,
on_demand_request_consecutive_failures: None,
};
expected.secretstore_conf.enabled = cfg!(feature = "secretstore");
expected.secretstore_conf.http_enabled = cfg!(feature = "secretstore");

View File

@ -136,8 +136,11 @@ pub struct RunCmd {
pub whisper: ::whisper::Config,
pub no_hardcoded_sync: bool,
pub max_round_blocks_to_import: usize,
pub on_demand_retry_count: Option<usize>,
pub on_demand_inactive_time_limit: Option<u64>,
pub on_demand_response_time_window: Option<u64>,
pub on_demand_request_backoff_start: Option<u64>,
pub on_demand_request_backoff_max: Option<u64>,
pub on_demand_request_backoff_rounds_max: Option<usize>,
pub on_demand_request_consecutive_failures: Option<usize>,
}
// node info fetcher for the local store.
@ -216,12 +219,31 @@ fn execute_light_impl(cmd: RunCmd, logger: Arc<RotatingLogger>) -> Result<Runnin
config.queue.verifier_settings = cmd.verifier_settings;
// start on_demand service.
let response_time_window = cmd.on_demand_response_time_window.map_or(
::light::on_demand::DEFAULT_RESPONSE_TIME_TO_LIVE,
|s| Duration::from_secs(s)
);
let request_backoff_start = cmd.on_demand_request_backoff_start.map_or(
::light::on_demand::DEFAULT_REQUEST_MIN_BACKOFF_DURATION,
|s| Duration::from_secs(s)
);
let request_backoff_max = cmd.on_demand_request_backoff_max.map_or(
::light::on_demand::DEFAULT_REQUEST_MAX_BACKOFF_DURATION,
|s| Duration::from_secs(s)
);
let on_demand = Arc::new({
let mut on_demand = ::light::on_demand::OnDemand::new(cache.clone());
on_demand.default_retry_number(cmd.on_demand_retry_count.unwrap_or(::light::on_demand::DEFAULT_RETRY_COUNT));
on_demand.query_inactive_time_limit(cmd.on_demand_inactive_time_limit.map(Duration::from_millis)
.unwrap_or(::light::on_demand::DEFAULT_QUERY_TIME_LIMIT));
on_demand
::light::on_demand::OnDemand::new(
cache.clone(),
response_time_window,
request_backoff_start,
request_backoff_max,
cmd.on_demand_request_backoff_rounds_max.unwrap_or(::light::on_demand::DEFAULT_MAX_REQUEST_BACKOFF_ROUNDS),
cmd.on_demand_request_consecutive_failures.unwrap_or(::light::on_demand::DEFAULT_NUM_CONSECUTIVE_FAILED_REQUESTS)
)
});
let sync_handle = Arc::new(RwLock::new(Weak::new()));

View File

@ -523,8 +523,8 @@ pub fn filter_block_not_found(id: BlockId) -> Error {
pub fn on_demand_error(err: OnDemandError) -> Error {
match err {
OnDemandError(OnDemandErrorKind::ChannelCanceled(e), _) => on_demand_cancel(e),
OnDemandError(OnDemandErrorKind::MaxAttemptReach(_), _) => max_attempts_reached(&err),
OnDemandError(OnDemandErrorKind::TimeoutOnNewPeers(_,_), _) => timeout_new_peer(&err),
OnDemandError(OnDemandErrorKind::RequestLimit, _) => timeout_new_peer(&err),
OnDemandError(OnDemandErrorKind::BadResponse(_), _) => max_attempts_reached(&err),
_ => on_demand_others(&err),
}
}