More code refactoring to integrate Duration (#8322)

* More code refactoring to integrate Duration

* Fix typo

* Fix tests

* More test fix
This commit is contained in:
Pierre Krieger 2018-04-14 21:35:58 +02:00 committed by Marek Kotewicz
parent 90eb61091a
commit fac356c701
28 changed files with 185 additions and 147 deletions

View File

@ -34,8 +34,8 @@ impl FetchControl {
}
pub fn wait_for_requests(&self, len: usize) {
const MAX_TIMEOUT_MS: u64 = 5000;
const ATTEMPTS: u64 = 10;
const MAX_TIMEOUT: time::Duration = time::Duration::from_millis(5000);
const ATTEMPTS: u32 = 10;
let mut attempts_left = ATTEMPTS;
loop {
let current = self.fetch.requested.lock().len();
@ -50,7 +50,7 @@ impl FetchControl {
} else {
attempts_left -= 1;
// Should we handle spurious timeouts better?
thread::park_timeout(time::Duration::from_millis(MAX_TIMEOUT_MS / ATTEMPTS));
thread::park_timeout(MAX_TIMEOUT / ATTEMPTS);
}
}
}

View File

@ -48,11 +48,11 @@ pub trait SampleStore: Send + Sync {
}
// get a hardcoded, arbitrarily determined (but intended overestimate)
// of the time in nanoseconds to serve a request of the given kind.
// of the time it takes to serve a request of the given kind.
//
// TODO: seed this with empirical data.
fn hardcoded_serve_time(kind: Kind) -> u64 {
match kind {
fn hardcoded_serve_time(kind: Kind) -> Duration {
Duration::new(0, match kind {
Kind::Headers => 500_000,
Kind::HeaderProof => 500_000,
Kind::TransactionIndex => 500_000,
@ -63,7 +63,7 @@ fn hardcoded_serve_time(kind: Kind) -> u64 {
Kind::Code => 1_500_000,
Kind::Execution => 250, // per gas.
Kind::Signal => 500_000,
}
})
}
/// A no-op store.
@ -114,10 +114,10 @@ impl LoadDistribution {
}
}
/// Calculate EMA of load in nanoseconds for a specific request kind.
/// Calculate EMA of load for a specific request kind.
/// If there is no data for the given request kind, no EMA will be calculated,
/// but a hardcoded time will be returned.
pub fn expected_time_ns(&self, kind: Kind) -> u64 {
pub fn expected_time(&self, kind: Kind) -> Duration {
let samples = self.samples.read();
samples.get(&kind).and_then(|s| {
if s.len() == 0 { return None }
@ -128,7 +128,9 @@ impl LoadDistribution {
(alpha * c as f64) + ((1.0 - alpha) * a)
});
Some(ema as u64)
// TODO: use `Duration::from_nanos` once stable (https://github.com/rust-lang/rust/issues/46507)
let ema = ema as u64;
Some(Duration::new(ema / 1_000_000_000, (ema % 1_000_000_000) as u32))
}).unwrap_or_else(move || hardcoded_serve_time(kind))
}
@ -223,12 +225,12 @@ mod tests {
#[test]
fn hardcoded_before_data() {
let dist = LoadDistribution::load(&NullStore);
assert_eq!(dist.expected_time_ns(Kind::Headers), hardcoded_serve_time(Kind::Headers));
assert_eq!(dist.expected_time(Kind::Headers), hardcoded_serve_time(Kind::Headers));
dist.update(Kind::Headers, Duration::new(0, 100_000), 100);
dist.end_period(&NullStore);
assert_eq!(dist.expected_time_ns(Kind::Headers), 1000);
assert_eq!(dist.expected_time(Kind::Headers), Duration::new(0, 1000));
}
#[test]
@ -244,20 +246,20 @@ mod tests {
sum += x;
if i == 0 { continue }
let moving_average = dist.expected_time_ns(Kind::Headers);
let moving_average = dist.expected_time(Kind::Headers);
// should be weighted below the maximum entry.
let arith_average = (sum as f64 / (i + 1) as f64) as u64;
assert!(moving_average < x as u64);
let arith_average = (sum as f64 / (i + 1) as f64) as u32;
assert!(moving_average < Duration::new(0, x));
// when there are only 2 entries, they should be equal due to choice of
// ALPHA = 1/N.
// otherwise, the weight should be below the arithmetic mean because the much
// smaller previous values are discounted less.
if i == 1 {
assert_eq!(moving_average, arith_average);
assert_eq!(moving_average, Duration::new(0, arith_average));
} else {
assert!(moving_average < arith_average)
assert!(moving_average < Duration::new(0, arith_average))
}
}
}

View File

@ -61,16 +61,16 @@ pub use self::load_timer::{SampleStore, FileStore};
pub use self::status::{Status, Capabilities, Announcement};
const TIMEOUT: TimerToken = 0;
const TIMEOUT_INTERVAL_MS: u64 = 1000;
const TIMEOUT_INTERVAL: Duration = Duration::from_secs(1);
const TICK_TIMEOUT: TimerToken = 1;
const TICK_TIMEOUT_INTERVAL_MS: u64 = 5000;
const TICK_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5);
const PROPAGATE_TIMEOUT: TimerToken = 2;
const PROPAGATE_TIMEOUT_INTERVAL_MS: u64 = 5000;
const PROPAGATE_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5);
const RECALCULATE_COSTS_TIMEOUT: TimerToken = 3;
const RECALCULATE_COSTS_INTERVAL_MS: u64 = 60 * 60 * 1000;
const RECALCULATE_COSTS_INTERVAL: Duration = Duration::from_secs(60 * 60);
// minimum interval between updates.
const UPDATE_INTERVAL: Duration = Duration::from_millis(5000);
@ -369,9 +369,9 @@ impl LightProtocol {
let sample_store = params.sample_store.unwrap_or_else(|| Box::new(NullStore));
let load_distribution = LoadDistribution::load(&*sample_store);
let flow_params = FlowParams::from_request_times(
|kind| load_distribution.expected_time_ns(kind),
|kind| load_distribution.expected_time(kind),
params.config.load_share,
params.config.max_stored_seconds,
Duration::from_secs(params.config.max_stored_seconds),
);
LightProtocol {
@ -766,9 +766,9 @@ impl LightProtocol {
self.load_distribution.end_period(&*self.sample_store);
let new_params = Arc::new(FlowParams::from_request_times(
|kind| self.load_distribution.expected_time_ns(kind),
|kind| self.load_distribution.expected_time(kind),
self.config.load_share,
self.config.max_stored_seconds,
Duration::from_secs(self.config.max_stored_seconds),
));
*self.flow_params.write() = new_params.clone();
@ -1080,13 +1080,13 @@ fn punish(peer: PeerId, io: &IoContext, e: Error) {
impl NetworkProtocolHandler for LightProtocol {
fn initialize(&self, io: &NetworkContext, _host_info: &HostInfo) {
io.register_timer(TIMEOUT, TIMEOUT_INTERVAL_MS)
io.register_timer(TIMEOUT, TIMEOUT_INTERVAL)
.expect("Error registering sync timer.");
io.register_timer(TICK_TIMEOUT, TICK_TIMEOUT_INTERVAL_MS)
io.register_timer(TICK_TIMEOUT, TICK_TIMEOUT_INTERVAL)
.expect("Error registering sync timer.");
io.register_timer(PROPAGATE_TIMEOUT, PROPAGATE_TIMEOUT_INTERVAL_MS)
io.register_timer(PROPAGATE_TIMEOUT, PROPAGATE_TIMEOUT_INTERVAL)
.expect("Error registering sync timer.");
io.register_timer(RECALCULATE_COSTS_TIMEOUT, RECALCULATE_COSTS_INTERVAL_MS)
io.register_timer(RECALCULATE_COSTS_TIMEOUT, RECALCULATE_COSTS_INTERVAL)
.expect("Error registering request timer interval token.");
}

View File

@ -235,23 +235,30 @@ impl FlowParams {
/// Create new flow parameters from ,
/// proportion of total capacity which should be given to a peer,
/// and number of seconds of stored capacity a peer can accumulate.
pub fn from_request_times<F: Fn(::request::Kind) -> u64>(
request_time_ns: F,
/// and stored capacity a peer can accumulate.
pub fn from_request_times<F: Fn(::request::Kind) -> Duration>(
request_time: F,
load_share: f64,
max_stored_seconds: u64
max_stored: Duration
) -> Self {
use request::Kind;
let load_share = load_share.abs();
let recharge: u64 = 100_000_000;
let max = recharge.saturating_mul(max_stored_seconds);
let max = {
let sec = max_stored.as_secs().saturating_mul(recharge);
let nanos = (max_stored.subsec_nanos() as u64).saturating_mul(recharge) / 1_000_000_000;
sec + nanos
};
let cost_for_kind = |kind| {
// how many requests we can handle per second
let ns = request_time_ns(kind);
let second_duration = 1_000_000_000f64 / ns as f64;
let rq_dur = request_time(kind);
let second_duration = {
let as_ns = rq_dur.as_secs() as f64 * 1_000_000_000f64 + rq_dur.subsec_nanos() as f64;
1_000_000_000f64 / as_ns
};
// scale by share of the load given to this peer.
let serve_per_second = second_duration * load_share;
@ -426,21 +433,21 @@ mod tests {
#[test]
fn scale_by_load_share_and_time() {
let flow_params = FlowParams::from_request_times(
|_| 10_000,
|_| Duration::new(0, 10_000),
0.05,
60,
Duration::from_secs(60),
);
let flow_params2 = FlowParams::from_request_times(
|_| 10_000,
|_| Duration::new(0, 10_000),
0.1,
60,
Duration::from_secs(60),
);
let flow_params3 = FlowParams::from_request_times(
|_| 5_000,
|_| Duration::new(0, 5_000),
0.05,
60,
Duration::from_secs(60),
);
assert_eq!(flow_params2.costs, flow_params3.costs);

View File

@ -18,6 +18,7 @@
use std::sync::Arc;
use std::path::Path;
use std::time::Duration;
use ansi_term::Colour;
use io::{IoContext, TimerToken, IoHandler, IoService, IoError};
@ -180,13 +181,13 @@ struct ClientIoHandler {
const CLIENT_TICK_TIMER: TimerToken = 0;
const SNAPSHOT_TICK_TIMER: TimerToken = 1;
const CLIENT_TICK_MS: u64 = 5000;
const SNAPSHOT_TICK_MS: u64 = 10000;
const CLIENT_TICK: Duration = Duration::from_secs(5);
const SNAPSHOT_TICK: Duration = Duration::from_secs(10);
impl IoHandler<ClientIoMessage> for ClientIoHandler {
fn initialize(&self, io: &IoContext<ClientIoMessage>) {
io.register_timer(CLIENT_TICK_TIMER, CLIENT_TICK_MS).expect("Error registering client timer");
io.register_timer(SNAPSHOT_TICK_TIMER, SNAPSHOT_TICK_MS).expect("Error registering snapshot timer");
io.register_timer(CLIENT_TICK_TIMER, CLIENT_TICK).expect("Error registering client timer");
io.register_timer(SNAPSHOT_TICK_TIMER, SNAPSHOT_TICK).expect("Error registering snapshot timer");
}
fn timeout(&self, _io: &IoContext<ClientIoMessage>, timer: TimerToken) {

View File

@ -638,8 +638,8 @@ impl AccountProvider {
}
/// Unlocks account temporarily with a timeout.
pub fn unlock_account_timed(&self, account: Address, password: String, duration_ms: u32) -> Result<(), Error> {
self.unlock_account(account, password, Unlock::Timed(Instant::now() + Duration::from_millis(duration_ms as u64)))
pub fn unlock_account_timed(&self, account: Address, password: String, duration: Duration) -> Result<(), Error> {
self.unlock_account(account, password, Unlock::Timed(Instant::now() + duration))
}
/// Checks if given account is unlocked
@ -834,7 +834,7 @@ impl AccountProvider {
#[cfg(test)]
mod tests {
use super::{AccountProvider, Unlock, DappId};
use std::time::Instant;
use std::time::{Duration, Instant};
use ethstore::ethkey::{Generator, Random, Address};
use ethstore::{StoreAccountRef, Derivation};
use ethereum_types::H256;
@ -938,8 +938,8 @@ mod tests {
let kp = Random.generate().unwrap();
let ap = AccountProvider::transient_provider();
assert!(ap.insert_account(kp.secret().clone(), "test").is_ok());
assert!(ap.unlock_account_timed(kp.address(), "test1".into(), 60000).is_err());
assert!(ap.unlock_account_timed(kp.address(), "test".into(), 60000).is_ok());
assert!(ap.unlock_account_timed(kp.address(), "test1".into(), Duration::from_secs(60)).is_err());
assert!(ap.unlock_account_timed(kp.address(), "test".into(), Duration::from_secs(60)).is_ok());
assert!(ap.sign(kp.address(), None, Default::default()).is_ok());
ap.unlocked.write().get_mut(&StoreAccountRef::root(kp.address())).unwrap().unlock = Unlock::Timed(Instant::now());
assert!(ap.sign(kp.address(), None, Default::default()).is_err());

View File

@ -19,7 +19,7 @@
use std::fmt;
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use std::sync::{Weak, Arc};
use std::time::{UNIX_EPOCH, Duration};
use std::time::{UNIX_EPOCH, SystemTime, Duration};
use std::collections::{BTreeMap, HashSet};
use std::iter::FromIterator;
@ -536,6 +536,7 @@ fn verify_timestamp(step: &Step, header_step: usize) -> Result<(), BlockError> {
// NOTE This error might be returned only in early stage of verification (Stage 1).
// Returning it further won't recover the sync process.
trace!(target: "engine", "verify_timestamp: block too early");
let oob = oob.map(|n| SystemTime::now() + Duration::from_secs(n));
Err(BlockError::TemporarilyInvalid(oob).into())
},
Ok(_) => Ok(()),
@ -694,8 +695,8 @@ const ENGINE_TIMEOUT_TOKEN: TimerToken = 23;
impl IoHandler<()> for TransitionHandler {
fn initialize(&self, io: &IoContext<()>) {
if let Some(engine) = self.engine.upgrade() {
let remaining = engine.step.duration_remaining();
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, remaining.as_millis())
let remaining = engine.step.duration_remaining().as_millis();
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, Duration::from_millis(remaining))
.unwrap_or_else(|e| warn!(target: "engine", "Failed to start consensus step timer: {}.", e))
}
}
@ -711,7 +712,7 @@ impl IoHandler<()> for TransitionHandler {
}
let next_run_at = engine.step.duration_remaining().as_millis() >> 2;
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, next_run_at)
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, Duration::from_millis(next_run_at))
.unwrap_or_else(|e| warn!(target: "engine", "Failed to restart consensus step timer: {}.", e))
}
}

View File

@ -51,8 +51,7 @@ impl<S, M: Machine> TransitionHandler<S, M> where S: Sync + Send + Clone {
pub const ENGINE_TIMEOUT_TOKEN: TimerToken = 23;
fn set_timeout<S: Sync + Send + Clone>(io: &IoContext<S>, timeout: Duration) {
let ms = timeout.as_secs() * 1_000 + timeout.subsec_nanos() as u64 / 1_000_000;
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, ms)
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, timeout)
.unwrap_or_else(|e| warn!(target: "engine", "Failed to set consensus step timeout: {}.", e))
}

View File

@ -17,6 +17,7 @@
//! General error types for use in ethcore.
use std::{fmt, error};
use std::time::SystemTime;
use kvdb;
use ethereum_types::{H256, U256, Address, Bloom};
use util_error::UtilError;
@ -81,9 +82,9 @@ pub enum BlockError {
/// Receipts trie root header field is invalid.
InvalidReceiptsRoot(Mismatch<H256>),
/// Timestamp header field is invalid.
InvalidTimestamp(OutOfBounds<u64>),
InvalidTimestamp(OutOfBounds<SystemTime>),
/// Timestamp header field is too far in future.
TemporarilyInvalid(OutOfBounds<u64>),
TemporarilyInvalid(OutOfBounds<SystemTime>),
/// Log bloom header field is invalid.
InvalidLogBloom(Mismatch<Bloom>),
/// Number field of header is invalid.
@ -125,8 +126,14 @@ impl fmt::Display for BlockError {
InvalidSeal => "Block has invalid seal.".into(),
InvalidGasLimit(ref oob) => format!("Invalid gas limit: {}", oob),
InvalidReceiptsRoot(ref mis) => format!("Invalid receipts trie root in header: {}", mis),
InvalidTimestamp(ref oob) => format!("Invalid timestamp in header: {}", oob),
TemporarilyInvalid(ref oob) => format!("Future timestamp in header: {}", oob),
InvalidTimestamp(ref oob) => {
let oob = oob.map(|st| st.elapsed().unwrap_or_default().as_secs());
format!("Invalid timestamp in header: {}", oob)
},
TemporarilyInvalid(ref oob) => {
let oob = oob.map(|st| st.elapsed().unwrap_or_default().as_secs());
format!("Future timestamp in header: {}", oob)
},
InvalidLogBloom(ref oob) => format!("Invalid log bloom in header: {}", oob),
InvalidNumber(ref mis) => format!("Invalid number in header: {}", mis),
RidiculousNumber(ref oob) => format!("Implausible block number. {}", oob),

View File

@ -22,7 +22,7 @@
//! 3. Final verification against the blockchain done before enactment.
use std::collections::HashSet;
use std::time::{SystemTime, UNIX_EPOCH};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use bytes::Bytes;
use ethereum_types::H256;
@ -284,11 +284,10 @@ pub fn verify_header_params(header: &Header, engine: &EthEngine, is_full: bool)
}
if is_full {
const ACCEPTABLE_DRIFT_SECS: u64 = 15;
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default();
let max_time = now.as_secs() + ACCEPTABLE_DRIFT_SECS;
let invalid_threshold = max_time + ACCEPTABLE_DRIFT_SECS * 9;
let timestamp = header.timestamp();
const ACCEPTABLE_DRIFT: Duration = Duration::from_secs(15);
let max_time = SystemTime::now() + ACCEPTABLE_DRIFT;
let invalid_threshold = max_time + ACCEPTABLE_DRIFT * 9;
let timestamp = UNIX_EPOCH + Duration::from_secs(header.timestamp());
if timestamp > invalid_threshold {
return Err(From::from(BlockError::InvalidTimestamp(OutOfBounds { max: Some(max_time), min: None, found: timestamp })))
@ -310,7 +309,9 @@ fn verify_parent(header: &Header, parent: &Header, engine: &EthEngine) -> Result
let gas_limit_divisor = engine.params().gas_limit_bound_divisor;
if !engine.is_timestamp_valid(header.timestamp(), parent.timestamp()) {
return Err(From::from(BlockError::InvalidTimestamp(OutOfBounds { max: None, min: Some(parent.timestamp() + 1), found: header.timestamp() })))
let min = SystemTime::now() + Duration::from_secs(parent.timestamp() + 1);
let found = SystemTime::now() + Duration::from_secs(header.timestamp());
return Err(From::from(BlockError::InvalidTimestamp(OutOfBounds { max: None, min: Some(min), found })))
}
if header.number() != parent.number() + 1 {
return Err(From::from(BlockError::InvalidNumber(Mismatch { expected: parent.number() + 1, found: header.number() })));
@ -679,8 +680,7 @@ mod tests {
header = good.clone();
header.set_timestamp(10);
check_fail(family_test(&create_test_block_with_data(&header, &good_transactions, &good_uncles), engine, &bc),
InvalidTimestamp(OutOfBounds { max: None, min: Some(parent.timestamp() + 1), found: header.timestamp() }));
check_fail_timestamp(family_test(&create_test_block_with_data(&header, &good_transactions, &good_uncles), engine, &bc), false);
header = good.clone();
header.set_timestamp(2450000000);

View File

@ -17,6 +17,7 @@
use std::sync::Arc;
use std::collections::{HashMap, BTreeMap};
use std::io;
use std::time::Duration;
use bytes::Bytes;
use devp2p::{NetworkService, ConnectionFilter};
use network::{NetworkProtocolHandler, NetworkContext, HostInfo, PeerId, ProtocolId,
@ -373,7 +374,7 @@ struct SyncProtocolHandler {
impl NetworkProtocolHandler for SyncProtocolHandler {
fn initialize(&self, io: &NetworkContext, _host_info: &HostInfo) {
if io.subprotocol_name() != WARP_SYNC_PROTOCOL_ID {
io.register_timer(0, 1000).expect("Error registering sync timer");
io.register_timer(0, Duration::from_secs(1)).expect("Error registering sync timer");
}
}

View File

@ -91,7 +91,7 @@
use std::sync::Arc;
use std::collections::{HashSet, HashMap};
use std::cmp;
use std::time::Instant;
use std::time::{Duration, Instant};
use hash::keccak;
use heapsize::HeapSizeOf;
use ethereum_types::{H256, U256};
@ -177,14 +177,14 @@ pub const SNAPSHOT_SYNC_PACKET_COUNT: u8 = 0x18;
const MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD: usize = 3;
const WAIT_PEERS_TIMEOUT_SEC: u64 = 5;
const STATUS_TIMEOUT_SEC: u64 = 5;
const HEADERS_TIMEOUT_SEC: u64 = 15;
const BODIES_TIMEOUT_SEC: u64 = 20;
const RECEIPTS_TIMEOUT_SEC: u64 = 10;
const FORK_HEADER_TIMEOUT_SEC: u64 = 3;
const SNAPSHOT_MANIFEST_TIMEOUT_SEC: u64 = 5;
const SNAPSHOT_DATA_TIMEOUT_SEC: u64 = 120;
const WAIT_PEERS_TIMEOUT: Duration = Duration::from_secs(5);
const STATUS_TIMEOUT: Duration = Duration::from_secs(5);
const HEADERS_TIMEOUT: Duration = Duration::from_secs(15);
const BODIES_TIMEOUT: Duration = Duration::from_secs(20);
const RECEIPTS_TIMEOUT: Duration = Duration::from_secs(10);
const FORK_HEADER_TIMEOUT: Duration = Duration::from_secs(3);
const SNAPSHOT_MANIFEST_TIMEOUT: Duration = Duration::from_secs(5);
const SNAPSHOT_DATA_TIMEOUT: Duration = Duration::from_secs(120);
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
/// Sync state
@ -573,7 +573,7 @@ impl ChainSync {
(best_hash, max_peers, snapshot_peers)
};
let timeout = (self.state == SyncState::WaitingPeers) && self.sync_start_time.map_or(false, |t| t.elapsed().as_secs() > WAIT_PEERS_TIMEOUT_SEC);
let timeout = (self.state == SyncState::WaitingPeers) && self.sync_start_time.map_or(false, |t| t.elapsed() > WAIT_PEERS_TIMEOUT);
if let (Some(hash), Some(peers)) = (best_hash, best_hash.map_or(None, |h| snapshot_peers.get(&h))) {
if max_peers >= SNAPSHOT_MIN_PEERS {
@ -1825,15 +1825,15 @@ impl ChainSync {
let tick = Instant::now();
let mut aborting = Vec::new();
for (peer_id, peer) in &self.peers {
let elapsed = (tick - peer.ask_time).as_secs();
let elapsed = tick - peer.ask_time;
let timeout = match peer.asking {
PeerAsking::BlockHeaders => elapsed > HEADERS_TIMEOUT_SEC,
PeerAsking::BlockBodies => elapsed > BODIES_TIMEOUT_SEC,
PeerAsking::BlockReceipts => elapsed > RECEIPTS_TIMEOUT_SEC,
PeerAsking::BlockHeaders => elapsed > HEADERS_TIMEOUT,
PeerAsking::BlockBodies => elapsed > BODIES_TIMEOUT,
PeerAsking::BlockReceipts => elapsed > RECEIPTS_TIMEOUT,
PeerAsking::Nothing => false,
PeerAsking::ForkHeader => elapsed > FORK_HEADER_TIMEOUT_SEC,
PeerAsking::SnapshotManifest => elapsed > SNAPSHOT_MANIFEST_TIMEOUT_SEC,
PeerAsking::SnapshotData => elapsed > SNAPSHOT_DATA_TIMEOUT_SEC,
PeerAsking::ForkHeader => elapsed > FORK_HEADER_TIMEOUT,
PeerAsking::SnapshotManifest => elapsed > SNAPSHOT_MANIFEST_TIMEOUT,
PeerAsking::SnapshotData => elapsed > SNAPSHOT_DATA_TIMEOUT,
};
if timeout {
trace!(target:"sync", "Timeout {}", peer_id);
@ -1848,7 +1848,7 @@ impl ChainSync {
// Check for handshake timeouts
for (peer, &ask_time) in &self.handshaking_peers {
let elapsed = (tick - ask_time) / 1_000_000_000;
if elapsed.as_secs() > STATUS_TIMEOUT_SEC {
if elapsed > STATUS_TIMEOUT {
trace!(target:"sync", "Status timeout {}", peer);
io.disconnect_peer(*peer);
}

View File

@ -58,12 +58,12 @@ mod sync_round;
#[cfg(test)]
mod tests;
// Base number of milliseconds for the header request timeout.
const REQ_TIMEOUT_MILLISECS_BASE: u64 = 7000;
// Additional number of milliseconds for each requested header.
// Base value for the header request timeout.
const REQ_TIMEOUT_BASE: Duration = Duration::from_secs(7);
// Additional value for each requested header.
// If we request N headers, then the timeout will be:
// REQ_TIMEOUT_MILLISECS_BASE + N * REQ_TIMEOUT_MILLISECS_PER_HEADER
const REQ_TIMEOUT_MILLISECS_PER_HEADER: u64 = 10;
// REQ_TIMEOUT_BASE + N * REQ_TIMEOUT_PER_HEADER
const REQ_TIMEOUT_PER_HEADER: Duration = Duration::from_millis(10);
/// Peer chain info.
#[derive(Debug, Clone, PartialEq, Eq)]
@ -585,11 +585,12 @@ impl<L: AsLightClient> LightSync<L> {
if requested_from.contains(peer) { continue }
match ctx.request_from(*peer, request.clone()) {
Ok(id) => {
let timeout_ms = REQ_TIMEOUT_MILLISECS_BASE +
req.max * REQ_TIMEOUT_MILLISECS_PER_HEADER;
assert!(req.max <= u32::max_value() as u64,
"requesting more than 2^32 headers at a time would overflow");
let timeout = REQ_TIMEOUT_BASE + REQ_TIMEOUT_PER_HEADER * req.max as u32;
self.pending_reqs.lock().insert(id.clone(), PendingReq {
started: Instant::now(),
timeout: Duration::from_millis(timeout_ms),
timeout,
});
requested_from.insert(peer.clone());

View File

@ -18,6 +18,7 @@
use std::sync::Arc;
use std::fmt;
use std::time::Duration;
use transaction::{
SignedTransaction, PendingTransaction, UnverifiedTransaction,
@ -50,7 +51,7 @@ extern crate kvdb_memorydb;
const LOCAL_TRANSACTIONS_KEY: &'static [u8] = &*b"LOCAL_TXS";
const UPDATE_TIMER: ::io::TimerToken = 0;
const UPDATE_TIMEOUT_MS: u64 = 15 * 60 * 1000; // once every 15 minutes.
const UPDATE_TIMEOUT: Duration = Duration::from_secs(15 * 60); // once every 15 minutes.
/// Errors which can occur while using the local data store.
#[derive(Debug)]
@ -205,7 +206,7 @@ impl<T: NodeInfo> LocalDataStore<T> {
impl<T: NodeInfo> IoHandler<ClientIoMessage> for LocalDataStore<T> {
fn initialize(&self, io: &::io::IoContext<ClientIoMessage>) {
if let Err(e) = io.register_timer(UPDATE_TIMER, UPDATE_TIMEOUT_MS) {
if let Err(e) = io.register_timer(UPDATE_TIMER, UPDATE_TIMEOUT) {
warn!(target: "local_store", "Error registering local store update timer: {}", e);
}
}

View File

@ -399,7 +399,7 @@ const INFO_TIMER: TimerToken = 0;
impl<T: InformantData> IoHandler<ClientIoMessage> for Informant<T> {
fn initialize(&self, io: &IoContext<ClientIoMessage>) {
io.register_timer(INFO_TIMER, 5000).expect("Error registering timer");
io.register_timer(INFO_TIMER, Duration::from_secs(5)).expect("Error registering timer");
}
fn timeout(&self, _io: &IoContext<ClientIoMessage>, timer: TimerToken) {

View File

@ -35,10 +35,10 @@ use parking_lot::RwLock;
// Attepmt to cull once every 10 minutes.
const TOKEN: TimerToken = 1;
const TIMEOUT_MS: u64 = 1000 * 60 * 10;
const TIMEOUT: Duration = Duration::from_secs(60 * 10);
// But make each attempt last only 9 minutes
const PURGE_TIMEOUT: Duration = Duration::from_millis(1000 * 60 * 9);
const PURGE_TIMEOUT: Duration = Duration::from_secs(60 * 9);
/// Periodically culls the transaction queue of mined transactions.
pub struct QueueCull<T> {
@ -56,7 +56,7 @@ pub struct QueueCull<T> {
impl<T: LightChainClient + 'static> IoHandler<ClientIoMessage> for QueueCull<T> {
fn initialize(&self, io: &IoContext<ClientIoMessage>) {
io.register_timer(TOKEN, TIMEOUT_MS).expect("Error registering timer");
io.register_timer(TOKEN, TIMEOUT).expect("Error registering timer");
}
fn timeout(&self, _io: &IoContext<ClientIoMessage>, timer: TimerToken) {

View File

@ -16,6 +16,7 @@
//! Account management (personal) rpc implementation
use std::sync::Arc;
use std::time::Duration;
use bytes::{Bytes, ToPretty};
use ethcore::account_provider::AccountProvider;
@ -130,8 +131,8 @@ impl<D: Dispatcher + 'static> Personal for PersonalClient<D> {
Some("Restart your client with --geth flag or use personal_sendTransaction instead."),
)),
(true, Some(0)) => store.unlock_account_permanently(account, account_pass),
(true, Some(d)) => store.unlock_account_timed(account, account_pass, d * 1000),
(true, None) => store.unlock_account_timed(account, account_pass, 300_000),
(true, Some(d)) => store.unlock_account_timed(account, account_pass, Duration::from_secs(d.into())),
(true, None) => store.unlock_account_timed(account, account_pass, Duration::from_secs(300)),
};
match r {
Ok(_) => Ok(true),

View File

@ -22,6 +22,7 @@
//! extern crate ethcore_io;
//! use ethcore_io::*;
//! use std::sync::Arc;
//! use std::time::Duration;
//!
//! struct MyHandler;
//!
@ -32,7 +33,7 @@
//!
//! impl IoHandler<MyMessage> for MyHandler {
//! fn initialize(&self, io: &IoContext<MyMessage>) {
//! io.register_timer(0, 1000).unwrap();
//! io.register_timer(0, Duration::from_secs(1)).unwrap();
//! }
//!
//! fn timeout(&self, _io: &IoContext<MyMessage>, timer: TimerToken) {
@ -147,6 +148,7 @@ pub use service::TOKENS_PER_HANDLER;
mod tests {
use std::sync::Arc;
use std::time::Duration;
use super::*;
struct MyHandler;
@ -158,7 +160,7 @@ mod tests {
impl IoHandler<MyMessage> for MyHandler {
fn initialize(&self, io: &IoContext<MyMessage>) {
io.register_timer(0, 1000).unwrap();
io.register_timer(0, Duration::from_secs(1)).unwrap();
}
fn timeout(&self, _io: &IoContext<MyMessage>, timer: TimerToken) {

View File

@ -54,7 +54,7 @@ pub enum IoMessage<Message> where Message: Send + Clone + Sized {
AddTimer {
handler_id: HandlerId,
token: TimerToken,
delay: u64,
delay: Duration,
once: bool,
},
RemoveTimer {
@ -93,10 +93,10 @@ impl<Message> IoContext<Message> where Message: Send + Clone + Sync + 'static {
}
/// Register a new recurring IO timer. 'IoHandler::timeout' will be called with the token.
pub fn register_timer(&self, token: TimerToken, ms: u64) -> Result<(), IoError> {
pub fn register_timer(&self, token: TimerToken, delay: Duration) -> Result<(), IoError> {
self.channel.send_io(IoMessage::AddTimer {
token: token,
delay: ms,
token,
delay,
handler_id: self.handler,
once: false,
})?;
@ -104,10 +104,10 @@ impl<Message> IoContext<Message> where Message: Send + Clone + Sync + 'static {
}
/// Register a new IO timer once. 'IoHandler::timeout' will be called with the token.
pub fn register_timer_once(&self, token: TimerToken, ms: u64) -> Result<(), IoError> {
pub fn register_timer_once(&self, token: TimerToken, delay: Duration) -> Result<(), IoError> {
self.channel.send_io(IoMessage::AddTimer {
token: token,
delay: ms,
token,
delay,
handler_id: self.handler,
once: true,
})?;
@ -173,7 +173,7 @@ impl<Message> IoContext<Message> where Message: Send + Clone + Sync + 'static {
#[derive(Clone)]
struct UserTimer {
delay: u64,
delay: Duration,
timeout: Timeout,
once: bool,
}
@ -252,7 +252,7 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
self.timers.write().remove(&token_id);
event_loop.clear_timeout(&timer.timeout);
} else {
event_loop.timeout(token, Duration::from_millis(timer.delay)).expect("Error re-registering user timer");
event_loop.timeout(token, timer.delay).expect("Error re-registering user timer");
}
self.worker_channel.push(Work { work_type: WorkType::Timeout, token: token_id, handler: handler.clone(), handler_id: handler_index });
self.work_ready.notify_all();
@ -283,7 +283,7 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
},
IoMessage::AddTimer { handler_id, token, delay, once } => {
let timer_id = token + handler_id * TOKENS_PER_HANDLER;
let timeout = event_loop.timeout(Token(timer_id), Duration::from_millis(delay)).expect("Error registering user timer");
let timeout = event_loop.timeout(Token(timer_id), delay).expect("Error registering user timer");
self.timers.write().insert(timer_id, UserTimer { delay: delay, timeout: timeout, once: once });
},
IoMessage::RemoveTimer { handler_id, token } => {

View File

@ -17,6 +17,7 @@
use std::collections::VecDeque;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use std::time::Duration;
use hash::{keccak, write_keccak};
use mio::{Token, Ready, PollOpt};
use mio::deprecated::{Handler, EventLoop, TryRead, TryWrite};
@ -37,7 +38,7 @@ use crypto;
use network::{Error, ErrorKind};
const ENCRYPTED_HEADER_LEN: usize = 32;
const RECIEVE_PAYLOAD_TIMEOUT: u64 = 30000;
const RECEIVE_PAYLOAD: Duration = Duration::from_secs(30);
pub const MAX_PAYLOAD_SIZE: usize = (1 << 24) - 1;
pub trait GenericSocket : Read + Write {
@ -447,7 +448,7 @@ impl EncryptedConnection {
if let EncryptedConnectionState::Header = self.read_state {
if let Some(data) = self.connection.readable()? {
self.read_header(&data)?;
io.register_timer(self.connection.token, RECIEVE_PAYLOAD_TIMEOUT)?;
io.register_timer(self.connection.token, RECEIVE_PAYLOAD)?;
}
};
if let EncryptedConnectionState::Payload = self.read_state {

View File

@ -14,6 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::time::Duration;
use rand::random;
use hash::write_keccak;
use mio::tcp::*;
@ -73,7 +74,7 @@ pub struct Handshake {
const V4_AUTH_PACKET_SIZE: usize = 307;
const V4_ACK_PACKET_SIZE: usize = 210;
const HANDSHAKE_TIMEOUT: u64 = 5000;
const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(5);
const PROTOCOL_VERSION: u64 = 4;
// Amount of bytes added when encrypting with encryptECIES.
const ECIES_OVERHEAD: usize = 113;

View File

@ -24,6 +24,7 @@ use std::cmp::{min, max};
use std::path::{Path, PathBuf};
use std::io::{Read, Write, self};
use std::fs;
use std::time::Duration;
use ethkey::{KeyPair, Secret, Random, Generator};
use hash::keccak;
use mio::*;
@ -67,13 +68,13 @@ const SYS_TIMER: TimerToken = LAST_SESSION + 1;
// Timeouts
// for IDLE TimerToken
const MAINTENANCE_TIMEOUT: u64 = 1000;
const MAINTENANCE_TIMEOUT: Duration = Duration::from_secs(1);
// for DISCOVERY_REFRESH TimerToken
const DISCOVERY_REFRESH_TIMEOUT: u64 = 60_000;
const DISCOVERY_REFRESH_TIMEOUT: Duration = Duration::from_secs(60);
// for DISCOVERY_ROUND TimerToken
const DISCOVERY_ROUND_TIMEOUT: u64 = 300;
const DISCOVERY_ROUND_TIMEOUT: Duration = Duration::from_millis(300);
// for NODE_TABLE TimerToken
const NODE_TABLE_TIMEOUT: u64 = 300_000;
const NODE_TABLE_TIMEOUT: Duration = Duration::from_secs(300);
#[derive(Debug, PartialEq, Eq)]
/// Protocol info
@ -165,10 +166,10 @@ impl<'s> NetworkContextTrait for NetworkContext<'s> {
self.session.as_ref().map_or(false, |s| s.lock().expired())
}
fn register_timer(&self, token: TimerToken, ms: u64) -> Result<(), Error> {
fn register_timer(&self, token: TimerToken, delay: Duration) -> Result<(), Error> {
self.io.message(NetworkIoMessage::AddTimer {
token: token,
delay: ms,
token,
delay,
protocol: self.protocol,
}).unwrap_or_else(|e| warn!("Error sending network IO message: {:?}", e));
Ok(())

View File

@ -24,12 +24,13 @@
//! use net::*;
//! use devp2p::NetworkService;
//! use std::sync::Arc;
//! use std::time::Duration;
//!
//! struct MyHandler;
//!
//! impl NetworkProtocolHandler for MyHandler {
//! fn initialize(&self, io: &NetworkContext, _host_info: &HostInfo) {
//! io.register_timer(0, 1000);
//! io.register_timer(0, Duration::from_secs(1));
//! }
//!
//! fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) {

View File

@ -34,8 +34,8 @@ use node_table::NodeId;
use snappy;
// Timeout must be less than (interval - 1).
const PING_TIMEOUT_SEC: Duration = Duration::from_secs(60);
const PING_INTERVAL_SEC: Duration = Duration::from_secs(120);
const PING_TIMEOUT: Duration = Duration::from_secs(60);
const PING_INTERVAL: Duration = Duration::from_secs(120);
const MIN_PROTOCOL_VERSION: u32 = 4;
const MIN_COMPRESSION_PROTOCOL_VERSION: u32 = 5;
@ -116,7 +116,7 @@ impl Session {
protocol_version: 0,
capabilities: Vec::new(),
peer_capabilities: Vec::new(),
ping_ms: None,
ping: None,
originated: originated,
remote_address: "Handshake".to_owned(),
local_address: local_addr,
@ -298,12 +298,12 @@ impl Session {
return true;
}
let timed_out = if let Some(pong) = self.pong_time {
pong.duration_since(self.ping_time) > PING_TIMEOUT_SEC
pong.duration_since(self.ping_time) > PING_TIMEOUT
} else {
self.ping_time.elapsed() > PING_TIMEOUT_SEC
self.ping_time.elapsed() > PING_TIMEOUT
};
if !timed_out && self.ping_time.elapsed() > PING_INTERVAL_SEC {
if !timed_out && self.ping_time.elapsed() > PING_INTERVAL {
if let Err(e) = self.send_ping(io) {
debug!("Error sending ping message: {:?}", e);
}
@ -368,9 +368,7 @@ impl Session {
PACKET_PONG => {
let time = Instant::now();
self.pong_time = Some(time);
let ping_elapsed = time.duration_since(self.ping_time);
self.info.ping_ms = Some(ping_elapsed.as_secs() * 1_000 +
ping_elapsed.subsec_nanos() as u64 / 1_000_000);
self.info.ping = Some(time.duration_since(self.ping_time));
Ok(SessionData::Continue)
},
PACKET_GET_PEERS => Ok(SessionData::None), //TODO;

View File

@ -71,7 +71,7 @@ impl TestProtocol {
impl NetworkProtocolHandler for TestProtocol {
fn initialize(&self, io: &NetworkContext, _host_info: &HostInfo) {
io.register_timer(0, 10).unwrap();
io.register_timer(0, Duration::from_millis(10)).unwrap();
}
fn read(&self, _io: &NetworkContext, _peer: &PeerId, packet_id: u8, data: &[u8]) {

View File

@ -37,6 +37,7 @@ use std::collections::HashMap;
use std::net::{SocketAddr, SocketAddrV4, Ipv4Addr};
use std::str::{self, FromStr};
use std::sync::Arc;
use std::time::Duration;
use ipnetwork::{IpNetwork, IpNetworkError};
use io::IoChannel;
use ethkey::Secret;
@ -74,8 +75,8 @@ pub enum NetworkIoMessage {
protocol: ProtocolId,
/// Timer token.
token: TimerToken,
/// Timer delay in milliseconds.
delay: u64,
/// Timer delay.
delay: Duration,
},
/// Initliaze public interface.
InitPublicInterface,
@ -100,8 +101,8 @@ pub struct SessionInfo {
pub capabilities: Vec<SessionCapabilityInfo>,
/// Peer protocol capabilities
pub peer_capabilities: Vec<PeerCapabilityInfo>,
/// Peer ping delay in milliseconds
pub ping_ms: Option<u64>,
/// Peer ping delay
pub ping: Option<Duration>,
/// True if this session was originated by us.
pub originated: bool,
/// Remote endpoint address of the session
@ -271,7 +272,7 @@ pub trait NetworkContext {
fn is_expired(&self) -> bool;
/// Register a new IO timer. 'IoHandler::timeout' will be called with the token.
fn register_timer(&self, token: TimerToken, ms: u64) -> Result<(), Error>;
fn register_timer(&self, token: TimerToken, delay: Duration) -> Result<(), Error>;
/// Returns peer identification string
fn peer_client_version(&self, peer: PeerId) -> String;
@ -315,8 +316,8 @@ impl<'a, T> NetworkContext for &'a T where T: ?Sized + NetworkContext {
(**self).is_expired()
}
fn register_timer(&self, token: TimerToken, ms: u64) -> Result<(), Error> {
(**self).register_timer(token, ms)
fn register_timer(&self, token: TimerToken, delay: Duration) -> Result<(), Error> {
(**self).register_timer(token, delay)
}
fn peer_client_version(&self, peer: PeerId) -> String {

View File

@ -44,6 +44,18 @@ pub struct OutOfBounds<T> {
pub found: T,
}
impl<T> OutOfBounds<T> {
pub fn map<F, U>(self, map: F) -> OutOfBounds<U>
where F: Fn(T) -> U
{
OutOfBounds {
min: self.min.map(&map),
max: self.max.map(&map),
found: map(self.found),
}
}
}
impl<T: fmt::Display> fmt::Display for OutOfBounds<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let msg = match (self.min.as_ref(), self.max.as_ref()) {

View File

@ -36,7 +36,7 @@ mod tests;
// how often periodic relays are. when messages are imported
// we directly broadcast.
const RALLY_TOKEN: TimerToken = 1;
const RALLY_TIMEOUT_MS: u64 = 2500;
const RALLY_TIMEOUT: Duration = Duration::from_millis(2500);
/// Current protocol version.
pub const PROTOCOL_VERSION: usize = 6;
@ -685,7 +685,7 @@ impl<T: MessageHandler> Network<T> {
impl<T: MessageHandler> ::network::NetworkProtocolHandler for Network<T> {
fn initialize(&self, io: &NetworkContext, host_info: &HostInfo) {
// set up broadcast timer (< 1s)
io.register_timer(RALLY_TOKEN, RALLY_TIMEOUT_MS)
io.register_timer(RALLY_TOKEN, RALLY_TIMEOUT)
.expect("Failed to initialize message rally timer");
*self.node_key.write() = host_info.id().clone();