Merge remote-tracking branch 'parity/master'
This commit is contained in:
@@ -7,7 +7,7 @@ version = "1.4.0"
|
||||
authors = ["Ethcore <admin@ethcore.io>"]
|
||||
|
||||
[dependencies]
|
||||
mio = { git = "https://github.com/ethcore/mio", branch = "v0.5.x" }
|
||||
mio = { git = "https://github.com/carllerche/mio" }
|
||||
crossbeam = "0.2"
|
||||
parking_lot = "0.3"
|
||||
log = "0.3"
|
||||
|
||||
@@ -65,7 +65,8 @@ mod service;
|
||||
mod worker;
|
||||
mod panics;
|
||||
|
||||
use mio::{EventLoop, Token};
|
||||
use mio::{Token};
|
||||
use mio::deprecated::{EventLoop, NotifyError};
|
||||
use std::fmt;
|
||||
|
||||
pub use worker::LOCAL_STACK_SIZE;
|
||||
@@ -96,8 +97,8 @@ impl From<::std::io::Error> for IoError {
|
||||
}
|
||||
}
|
||||
|
||||
impl<Message> From<::mio::NotifyError<service::IoMessage<Message>>> for IoError where Message: Send + Clone {
|
||||
fn from(_err: ::mio::NotifyError<service::IoMessage<Message>>) -> IoError {
|
||||
impl<Message> From<NotifyError<service::IoMessage<Message>>> for IoError where Message: Send + Clone {
|
||||
fn from(_err: NotifyError<service::IoMessage<Message>>) -> IoError {
|
||||
IoError::Mio(::std::io::Error::new(::std::io::ErrorKind::ConnectionAborted, "Network IO notification error"))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,13 +18,16 @@ use std::sync::{Arc, Weak};
|
||||
use std::thread::{self, JoinHandle};
|
||||
use std::collections::HashMap;
|
||||
use mio::*;
|
||||
use mio::timer::{Timeout};
|
||||
use mio::deprecated::{EventLoop, Handler, Sender, EventLoopBuilder};
|
||||
use crossbeam::sync::chase_lev;
|
||||
use slab::Slab;
|
||||
use {IoError, IoHandler};
|
||||
use worker::{Worker, Work, WorkType};
|
||||
use panics::*;
|
||||
use parking_lot::{RwLock};
|
||||
use parking_lot::{RwLock, Mutex};
|
||||
use std::sync::{Condvar as SCondvar, Mutex as SMutex};
|
||||
use std::time::Duration;
|
||||
|
||||
/// Timer ID
|
||||
pub type TimerToken = usize;
|
||||
@@ -223,9 +226,9 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
|
||||
type Timeout = Token;
|
||||
type Message = IoMessage<Message>;
|
||||
|
||||
fn ready(&mut self, _event_loop: &mut EventLoop<Self>, token: Token, events: EventSet) {
|
||||
let handler_index = token.as_usize() / TOKENS_PER_HANDLER;
|
||||
let token_id = token.as_usize() % TOKENS_PER_HANDLER;
|
||||
fn ready(&mut self, _event_loop: &mut EventLoop<Self>, token: Token, events: Ready) {
|
||||
let handler_index = token.0 / TOKENS_PER_HANDLER;
|
||||
let token_id = token.0 % TOKENS_PER_HANDLER;
|
||||
if let Some(handler) = self.handlers.read().get(handler_index) {
|
||||
if events.is_hup() {
|
||||
self.worker_channel.push(Work { work_type: WorkType::Hup, token: token_id, handler: handler.clone(), handler_id: handler_index });
|
||||
@@ -243,15 +246,15 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
|
||||
}
|
||||
|
||||
fn timeout(&mut self, event_loop: &mut EventLoop<Self>, token: Token) {
|
||||
let handler_index = token.as_usize() / TOKENS_PER_HANDLER;
|
||||
let token_id = token.as_usize() % TOKENS_PER_HANDLER;
|
||||
let handler_index = token.0 / TOKENS_PER_HANDLER;
|
||||
let token_id = token.0 % TOKENS_PER_HANDLER;
|
||||
if let Some(handler) = self.handlers.read().get(handler_index) {
|
||||
if let Some(timer) = self.timers.read().get(&token.as_usize()) {
|
||||
if let Some(timer) = self.timers.read().get(&token.0) {
|
||||
if timer.once {
|
||||
self.timers.write().remove(&token_id);
|
||||
event_loop.clear_timeout(timer.timeout);
|
||||
event_loop.clear_timeout(&timer.timeout);
|
||||
} else {
|
||||
event_loop.timeout_ms(token, timer.delay).expect("Error re-registering user timer");
|
||||
event_loop.timeout(token, Duration::from_millis(timer.delay)).expect("Error re-registering user timer");
|
||||
}
|
||||
self.worker_channel.push(Work { work_type: WorkType::Timeout, token: token_id, handler: handler.clone(), handler_id: handler_index });
|
||||
self.work_ready.notify_all();
|
||||
@@ -277,18 +280,18 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
|
||||
let to_remove: Vec<_> = timers.keys().cloned().filter(|timer_id| timer_id / TOKENS_PER_HANDLER == handler_id).collect();
|
||||
for timer_id in to_remove {
|
||||
let timer = timers.remove(&timer_id).expect("to_remove only contains keys from timers; qed");
|
||||
event_loop.clear_timeout(timer.timeout);
|
||||
event_loop.clear_timeout(&timer.timeout);
|
||||
}
|
||||
},
|
||||
IoMessage::AddTimer { handler_id, token, delay, once } => {
|
||||
let timer_id = token + handler_id * TOKENS_PER_HANDLER;
|
||||
let timeout = event_loop.timeout_ms(Token(timer_id), delay).expect("Error registering user timer");
|
||||
let timeout = event_loop.timeout(Token(timer_id), Duration::from_millis(delay)).expect("Error registering user timer");
|
||||
self.timers.write().insert(timer_id, UserTimer { delay: delay, timeout: timeout, once: once });
|
||||
},
|
||||
IoMessage::RemoveTimer { handler_id, token } => {
|
||||
let timer_id = token + handler_id * TOKENS_PER_HANDLER;
|
||||
if let Some(timer) = self.timers.write().remove(&timer_id) {
|
||||
event_loop.clear_timeout(timer.timeout);
|
||||
event_loop.clear_timeout(&timer.timeout);
|
||||
}
|
||||
},
|
||||
IoMessage::RegisterStream { handler_id, token } => {
|
||||
@@ -302,7 +305,7 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
|
||||
// unregister a timer associated with the token (if any)
|
||||
let timer_id = token + handler_id * TOKENS_PER_HANDLER;
|
||||
if let Some(timer) = self.timers.write().remove(&timer_id) {
|
||||
event_loop.clear_timeout(timer.timeout);
|
||||
event_loop.clear_timeout(&timer.timeout);
|
||||
}
|
||||
}
|
||||
},
|
||||
@@ -391,7 +394,7 @@ impl<Message> IoChannel<Message> where Message: Send + Clone + Sync + 'static {
|
||||
pub struct IoService<Message> where Message: Send + Sync + Clone + 'static {
|
||||
panic_handler: Arc<PanicHandler>,
|
||||
thread: Option<JoinHandle<()>>,
|
||||
host_channel: Sender<IoMessage<Message>>,
|
||||
host_channel: Mutex<Sender<IoMessage<Message>>>,
|
||||
handlers: Arc<RwLock<Slab<Arc<IoHandler<Message>>, HandlerId>>>,
|
||||
}
|
||||
|
||||
@@ -405,9 +408,9 @@ impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static {
|
||||
/// Starts IO event loop
|
||||
pub fn start() -> Result<IoService<Message>, IoError> {
|
||||
let panic_handler = PanicHandler::new_in_arc();
|
||||
let mut config = EventLoopConfig::new();
|
||||
let mut config = EventLoopBuilder::new();
|
||||
config.messages_per_tick(1024);
|
||||
let mut event_loop = EventLoop::configured(config).expect("Error creating event loop");
|
||||
let mut event_loop = config.build().expect("Error creating event loop");
|
||||
let channel = event_loop.channel();
|
||||
let panic = panic_handler.clone();
|
||||
let handlers = Arc::new(RwLock::new(Slab::new(MAX_HANDLERS)));
|
||||
@@ -421,14 +424,14 @@ impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static {
|
||||
Ok(IoService {
|
||||
panic_handler: panic_handler,
|
||||
thread: Some(thread),
|
||||
host_channel: channel,
|
||||
host_channel: Mutex::new(channel),
|
||||
handlers: handlers,
|
||||
})
|
||||
}
|
||||
|
||||
/// Regiter an IO handler with the event loop.
|
||||
pub fn register_handler(&self, handler: Arc<IoHandler<Message>+Send>) -> Result<(), IoError> {
|
||||
try!(self.host_channel.send(IoMessage::AddHandler {
|
||||
try!(self.host_channel.lock().send(IoMessage::AddHandler {
|
||||
handler: handler,
|
||||
}));
|
||||
Ok(())
|
||||
@@ -436,20 +439,20 @@ impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static {
|
||||
|
||||
/// Send a message over the network. Normaly `HostIo::send` should be used. This can be used from non-io threads.
|
||||
pub fn send_message(&self, message: Message) -> Result<(), IoError> {
|
||||
try!(self.host_channel.send(IoMessage::UserMessage(message)));
|
||||
try!(self.host_channel.lock().send(IoMessage::UserMessage(message)));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Create a new message channel
|
||||
pub fn channel(&self) -> IoChannel<Message> {
|
||||
IoChannel::new(self.host_channel.clone(), Arc::downgrade(&self.handlers))
|
||||
IoChannel::new(self.host_channel.lock().clone(), Arc::downgrade(&self.handlers))
|
||||
}
|
||||
}
|
||||
|
||||
impl<Message> Drop for IoService<Message> where Message: Send + Sync + Clone {
|
||||
fn drop(&mut self) {
|
||||
trace!(target: "shutdown", "[IoService] Closing...");
|
||||
self.host_channel.send(IoMessage::Shutdown).unwrap_or_else(|e| warn!("Error on IO service shutdown: {:?}", e));
|
||||
self.host_channel.lock().send(IoMessage::Shutdown).unwrap_or_else(|e| warn!("Error on IO service shutdown: {:?}", e));
|
||||
self.thread.take().unwrap().join().ok();
|
||||
trace!(target: "shutdown", "[IoService] Closed.");
|
||||
}
|
||||
|
||||
@@ -8,7 +8,8 @@ authors = ["Ethcore <admin@ethcore.io>"]
|
||||
|
||||
[dependencies]
|
||||
log = "0.3"
|
||||
mio = { git = "https://github.com/ethcore/mio", branch = "v0.5.x" }
|
||||
mio = { git = "https://github.com/carllerche/mio" }
|
||||
bytes = "0.3.0"
|
||||
rand = "0.3.12"
|
||||
time = "0.1.34"
|
||||
tiny-keccak = "1.0"
|
||||
|
||||
@@ -18,7 +18,8 @@ use std::sync::Arc;
|
||||
use std::collections::VecDeque;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
|
||||
use mio::{Handler, Token, EventSet, EventLoop, PollOpt, TryRead, TryWrite};
|
||||
use mio::{Token, Ready, PollOpt};
|
||||
use mio::deprecated::{Handler, EventLoop, TryRead, TryWrite};
|
||||
use mio::tcp::*;
|
||||
use util::hash::*;
|
||||
use util::sha3::*;
|
||||
@@ -34,6 +35,7 @@ use rcrypto::aessafe::*;
|
||||
use rcrypto::symmetriccipher::*;
|
||||
use rcrypto::buffer::*;
|
||||
use tiny_keccak::Keccak;
|
||||
use bytes::{Buf, MutBuf};
|
||||
use crypto;
|
||||
|
||||
const ENCRYPTED_HEADER_LEN: usize = 32;
|
||||
@@ -57,7 +59,7 @@ pub struct GenericConnection<Socket: GenericSocket> {
|
||||
/// Send out packets FIFO
|
||||
send_queue: VecDeque<Cursor<Bytes>>,
|
||||
/// Event flags this connection expects
|
||||
interest: EventSet,
|
||||
interest: Ready,
|
||||
/// Shared network statistics
|
||||
stats: Arc<NetworkStats>,
|
||||
/// Registered flag
|
||||
@@ -81,8 +83,9 @@ impl<Socket: GenericSocket> GenericConnection<Socket> {
|
||||
let sock_ref = <Socket as Read>::by_ref(&mut self.socket);
|
||||
loop {
|
||||
let max = self.rec_size - self.rec_buf.len();
|
||||
match sock_ref.take(max as u64).try_read_buf(&mut self.rec_buf) {
|
||||
match sock_ref.take(max as u64).try_read(unsafe { self.rec_buf.mut_bytes() }) {
|
||||
Ok(Some(size)) if size != 0 => {
|
||||
unsafe { self.rec_buf.advance(size); }
|
||||
self.stats.inc_recv(size);
|
||||
trace!(target:"network", "{}: Read {} of {} bytes", self.token, self.rec_buf.len(), self.rec_size);
|
||||
if self.rec_size != 0 && self.rec_buf.len() == self.rec_size {
|
||||
@@ -109,7 +112,7 @@ impl<Socket: GenericSocket> GenericConnection<Socket> {
|
||||
trace!(target:"network", "{}: Sending {} bytes", self.token, data.len());
|
||||
self.send_queue.push_back(Cursor::new(data));
|
||||
if !self.interest.is_writable() {
|
||||
self.interest.insert(EventSet::writable());
|
||||
self.interest.insert(Ready::writable());
|
||||
}
|
||||
io.update_registration(self.token).ok();
|
||||
}
|
||||
@@ -128,16 +131,19 @@ impl<Socket: GenericSocket> GenericConnection<Socket> {
|
||||
{
|
||||
let buf = self.send_queue.front_mut().unwrap();
|
||||
let send_size = buf.get_ref().len();
|
||||
if (buf.position() as usize) >= send_size {
|
||||
let pos = buf.position() as usize;
|
||||
if (pos as usize) >= send_size {
|
||||
warn!(target:"net", "Unexpected connection data");
|
||||
return Ok(WriteStatus::Complete)
|
||||
}
|
||||
match self.socket.try_write_buf(buf) {
|
||||
Ok(Some(size)) if (buf.position() as usize) < send_size => {
|
||||
let buf = buf as &mut Buf;
|
||||
match self.socket.try_write(buf.bytes()) {
|
||||
Ok(Some(size)) if (pos + size) < send_size => {
|
||||
buf.advance(size);
|
||||
self.stats.inc_send(size);
|
||||
Ok(WriteStatus::Ongoing)
|
||||
},
|
||||
Ok(Some(size)) if (buf.position() as usize) == send_size => {
|
||||
Ok(Some(size)) if (pos + size) == send_size => {
|
||||
self.stats.inc_send(size);
|
||||
trace!(target:"network", "{}: Wrote {} bytes", self.token, send_size);
|
||||
Ok(WriteStatus::Complete)
|
||||
@@ -151,7 +157,7 @@ impl<Socket: GenericSocket> GenericConnection<Socket> {
|
||||
self.send_queue.pop_front();
|
||||
}
|
||||
if self.send_queue.is_empty() {
|
||||
self.interest.remove(EventSet::writable());
|
||||
self.interest.remove(Ready::writable());
|
||||
try!(io.update_registration(self.token));
|
||||
}
|
||||
Ok(r)
|
||||
@@ -171,7 +177,7 @@ impl Connection {
|
||||
send_queue: VecDeque::new(),
|
||||
rec_buf: Bytes::new(),
|
||||
rec_size: 0,
|
||||
interest: EventSet::hup() | EventSet::readable(),
|
||||
interest: Ready::hup() | Ready::readable(),
|
||||
stats: stats,
|
||||
registered: AtomicBool::new(false),
|
||||
}
|
||||
@@ -205,7 +211,7 @@ impl Connection {
|
||||
rec_buf: Vec::new(),
|
||||
rec_size: 0,
|
||||
send_queue: self.send_queue.clone(),
|
||||
interest: EventSet::hup(),
|
||||
interest: Ready::hup(),
|
||||
stats: self.stats.clone(),
|
||||
registered: AtomicBool::new(false),
|
||||
})
|
||||
@@ -499,7 +505,7 @@ mod tests {
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use super::super::stats::*;
|
||||
use std::io::{Read, Write, Error, Cursor, ErrorKind};
|
||||
use mio::{EventSet};
|
||||
use mio::{Ready};
|
||||
use std::collections::VecDeque;
|
||||
use util::bytes::*;
|
||||
use devtools::*;
|
||||
@@ -545,7 +551,7 @@ mod tests {
|
||||
send_queue: VecDeque::new(),
|
||||
rec_buf: Bytes::new(),
|
||||
rec_size: 0,
|
||||
interest: EventSet::hup() | EventSet::readable(),
|
||||
interest: Ready::hup() | Ready::readable(),
|
||||
stats: Arc::<NetworkStats>::new(NetworkStats::new()),
|
||||
registered: AtomicBool::new(false),
|
||||
}
|
||||
@@ -568,7 +574,7 @@ mod tests {
|
||||
send_queue: VecDeque::new(),
|
||||
rec_buf: Bytes::new(),
|
||||
rec_size: 0,
|
||||
interest: EventSet::hup() | EventSet::readable(),
|
||||
interest: Ready::hup() | Ready::readable(),
|
||||
stats: Arc::<NetworkStats>::new(NetworkStats::new()),
|
||||
registered: AtomicBool::new(false),
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ use std::collections::{HashSet, HashMap, BTreeMap, VecDeque};
|
||||
use std::mem;
|
||||
use std::default::Default;
|
||||
use mio::*;
|
||||
use mio::deprecated::{Handler, EventLoop};
|
||||
use mio::udp::*;
|
||||
use util::sha3::*;
|
||||
use time;
|
||||
@@ -57,6 +58,7 @@ pub struct NodeEntry {
|
||||
|
||||
pub struct BucketEntry {
|
||||
pub address: NodeEntry,
|
||||
pub id_hash: H256,
|
||||
pub timeout: Option<u64>,
|
||||
}
|
||||
|
||||
@@ -85,6 +87,7 @@ struct Datagramm {
|
||||
|
||||
pub struct Discovery {
|
||||
id: NodeId,
|
||||
id_hash: H256,
|
||||
secret: Secret,
|
||||
public_endpoint: NodeEndpoint,
|
||||
udp_socket: UdpSocket,
|
||||
@@ -106,9 +109,10 @@ pub struct TableUpdates {
|
||||
|
||||
impl Discovery {
|
||||
pub fn new(key: &KeyPair, listen: SocketAddr, public: NodeEndpoint, token: StreamToken, allow_ips: AllowIP) -> Discovery {
|
||||
let socket = UdpSocket::bound(&listen).expect("Error binding UDP socket");
|
||||
let socket = UdpSocket::bind(&listen).expect("Error binding UDP socket");
|
||||
Discovery {
|
||||
id: key.public().clone(),
|
||||
id_hash: key.public().sha3(),
|
||||
secret: key.secret().clone(),
|
||||
public_endpoint: public,
|
||||
token: token,
|
||||
@@ -150,8 +154,9 @@ impl Discovery {
|
||||
|
||||
fn update_node(&mut self, e: NodeEntry) {
|
||||
trace!(target: "discovery", "Inserting {:?}", &e);
|
||||
let id_hash = e.id.sha3();
|
||||
let ping = {
|
||||
let mut bucket = self.node_buckets.get_mut(Discovery::distance(&self.id, &e.id) as usize).unwrap();
|
||||
let mut bucket = self.node_buckets.get_mut(Discovery::distance(&self.id_hash, &id_hash) as usize).unwrap();
|
||||
let updated = if let Some(node) = bucket.nodes.iter_mut().find(|n| n.address.id == e.id) {
|
||||
node.address = e.clone();
|
||||
node.timeout = None;
|
||||
@@ -159,7 +164,7 @@ impl Discovery {
|
||||
} else { false };
|
||||
|
||||
if !updated {
|
||||
bucket.nodes.push_front(BucketEntry { address: e, timeout: None });
|
||||
bucket.nodes.push_front(BucketEntry { address: e, timeout: None, id_hash: id_hash, });
|
||||
}
|
||||
|
||||
if bucket.nodes.len() > BUCKET_SIZE {
|
||||
@@ -174,7 +179,7 @@ impl Discovery {
|
||||
}
|
||||
|
||||
fn clear_ping(&mut self, id: &NodeId) {
|
||||
let mut bucket = self.node_buckets.get_mut(Discovery::distance(&self.id, id) as usize).unwrap();
|
||||
let mut bucket = self.node_buckets.get_mut(Discovery::distance(&self.id_hash, &id.sha3()) as usize).unwrap();
|
||||
if let Some(node) = bucket.nodes.iter_mut().find(|n| &n.address.id == id) {
|
||||
node.timeout = None;
|
||||
}
|
||||
@@ -224,8 +229,8 @@ impl Discovery {
|
||||
self.discovery_round += 1;
|
||||
}
|
||||
|
||||
fn distance(a: &NodeId, b: &NodeId) -> u32 {
|
||||
let d = a.sha3() ^ b.sha3();
|
||||
fn distance(a: &H256, b: &H256) -> u32 {
|
||||
let d = *a ^ *b;
|
||||
let mut ret:u32 = 0;
|
||||
for i in 0..32 {
|
||||
let mut v: u8 = d[i];
|
||||
@@ -279,11 +284,12 @@ impl Discovery {
|
||||
fn nearest_node_entries(target: &NodeId, buckets: &[NodeBucket]) -> Vec<NodeEntry> {
|
||||
let mut found: BTreeMap<u32, Vec<&NodeEntry>> = BTreeMap::new();
|
||||
let mut count = 0;
|
||||
let target_hash = target.sha3();
|
||||
|
||||
// Sort nodes by distance to target
|
||||
for bucket in buckets {
|
||||
for node in &bucket.nodes {
|
||||
let distance = Discovery::distance(target, &node.address.id);
|
||||
let distance = Discovery::distance(&target_hash, &node.id_hash);
|
||||
found.entry(distance).or_insert_with(Vec::new).push(&node.address);
|
||||
if count == BUCKET_SIZE {
|
||||
// delete the most distant element
|
||||
@@ -527,15 +533,15 @@ impl Discovery {
|
||||
}
|
||||
|
||||
pub fn register_socket<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), NetworkError> {
|
||||
event_loop.register(&self.udp_socket, Token(self.token), EventSet::all(), PollOpt::edge()).expect("Error registering UDP socket");
|
||||
event_loop.register(&self.udp_socket, Token(self.token), Ready::all(), PollOpt::edge()).expect("Error registering UDP socket");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn update_registration<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), NetworkError> {
|
||||
let registration = if !self.send_queue.is_empty() {
|
||||
EventSet::readable() | EventSet::writable()
|
||||
Ready::readable() | Ready::writable()
|
||||
} else {
|
||||
EventSet::readable()
|
||||
Ready::readable()
|
||||
};
|
||||
event_loop.reregister(&self.udp_socket, Token(self.token), registration, PollOpt::edge()).expect("Error reregistering UDP socket");
|
||||
Ok(())
|
||||
@@ -546,6 +552,7 @@ impl Discovery {
|
||||
mod tests {
|
||||
use super::*;
|
||||
use util::hash::*;
|
||||
use util::sha3::*;
|
||||
use std::net::*;
|
||||
use node_table::*;
|
||||
use std::str::FromStr;
|
||||
@@ -626,7 +633,8 @@ mod tests {
|
||||
for _ in 0..(16 + 10) {
|
||||
buckets[0].nodes.push_back(BucketEntry {
|
||||
address: NodeEntry { id: NodeId::new(), endpoint: ep.clone() },
|
||||
timeout: None
|
||||
timeout: None,
|
||||
id_hash: NodeId::new().sha3(),
|
||||
});
|
||||
}
|
||||
let nearest = Discovery::nearest_node_entries(&NodeId::new(), &buckets);
|
||||
|
||||
@@ -26,6 +26,7 @@ use std::io::{Read, Write};
|
||||
use std::fs;
|
||||
use ethkey::{KeyPair, Secret, Random, Generator};
|
||||
use mio::*;
|
||||
use mio::deprecated::{EventLoop};
|
||||
use mio::tcp::*;
|
||||
use util::hash::*;
|
||||
use util::Hashable;
|
||||
@@ -61,7 +62,7 @@ const SYS_TIMER: usize = LAST_SESSION + 1;
|
||||
|
||||
// Timeouts
|
||||
const MAINTENANCE_TIMEOUT: u64 = 1000;
|
||||
const DISCOVERY_REFRESH_TIMEOUT: u64 = 7200;
|
||||
const DISCOVERY_REFRESH_TIMEOUT: u64 = 60_000;
|
||||
const DISCOVERY_ROUND_TIMEOUT: u64 = 300;
|
||||
const NODE_TABLE_TIMEOUT: u64 = 300_000;
|
||||
|
||||
@@ -744,10 +745,9 @@ impl Host {
|
||||
trace!(target: "network", "Accepting incoming connection");
|
||||
loop {
|
||||
let socket = match self.tcp_listener.lock().accept() {
|
||||
Ok(None) => break,
|
||||
Ok(Some((sock, _addr))) => sock,
|
||||
Ok((sock, _addr)) => sock,
|
||||
Err(e) => {
|
||||
warn!("Error accepting connection: {:?}", e);
|
||||
debug!(target: "network", "Error accepting connection: {:?}", e);
|
||||
break
|
||||
},
|
||||
};
|
||||
@@ -801,29 +801,31 @@ impl Host {
|
||||
},
|
||||
Ok(SessionData::Ready) => {
|
||||
self.num_sessions.fetch_add(1, AtomicOrdering::SeqCst);
|
||||
if !s.info.originated {
|
||||
let session_count = self.session_count();
|
||||
let (max_peers, reserved_only) = {
|
||||
let info = self.info.read();
|
||||
let mut max_peers = info.config.max_peers;
|
||||
for cap in s.info.capabilities.iter() {
|
||||
if let Some(num) = info.config.reserved_protocols.get(&cap.protocol) {
|
||||
max_peers += *num;
|
||||
break;
|
||||
}
|
||||
}
|
||||
(max_peers, info.config.non_reserved_mode == NonReservedPeerMode::Deny)
|
||||
};
|
||||
|
||||
if session_count >= max_peers as usize || reserved_only {
|
||||
// only proceed if the connecting peer is reserved.
|
||||
if !self.reserved_nodes.read().contains(s.id().unwrap()) {
|
||||
s.disconnect(io, DisconnectReason::TooManyPeers);
|
||||
return;
|
||||
let session_count = self.session_count();
|
||||
let (min_peers, max_peers, reserved_only) = {
|
||||
let info = self.info.read();
|
||||
let mut max_peers = info.config.max_peers;
|
||||
for cap in s.info.capabilities.iter() {
|
||||
if let Some(num) = info.config.reserved_protocols.get(&cap.protocol) {
|
||||
max_peers += *num;
|
||||
break;
|
||||
}
|
||||
}
|
||||
(info.config.min_peers as usize, max_peers as usize, info.config.non_reserved_mode == NonReservedPeerMode::Deny)
|
||||
};
|
||||
|
||||
// Add it no node table
|
||||
if reserved_only ||
|
||||
(s.info.originated && session_count >= min_peers) ||
|
||||
(!s.info.originated && session_count >= max_peers) {
|
||||
// only proceed if the connecting peer is reserved.
|
||||
if !self.reserved_nodes.read().contains(s.id().unwrap()) {
|
||||
s.disconnect(io, DisconnectReason::TooManyPeers);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Add it to the node table
|
||||
if !s.info.originated {
|
||||
if let Ok(address) = s.remote_addr() {
|
||||
let entry = NodeEntry { id: s.id().unwrap().clone(), endpoint: NodeEndpoint { address: address, udp_port: address.port() } };
|
||||
self.nodes.write().add_node(Node::new(entry.id.clone(), entry.endpoint.clone()));
|
||||
@@ -1101,7 +1103,7 @@ impl IoHandler<NetworkIoMessage> for Host {
|
||||
}
|
||||
}
|
||||
DISCOVERY => self.discovery.lock().as_ref().unwrap().register_socket(event_loop).expect("Error registering discovery socket"),
|
||||
TCP_ACCEPT => event_loop.register(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"),
|
||||
TCP_ACCEPT => event_loop.register(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), Ready::all(), PollOpt::edge()).expect("Error registering stream"),
|
||||
_ => warn!("Unexpected stream registration")
|
||||
}
|
||||
}
|
||||
@@ -1129,7 +1131,7 @@ impl IoHandler<NetworkIoMessage> for Host {
|
||||
}
|
||||
}
|
||||
DISCOVERY => self.discovery.lock().as_ref().unwrap().update_registration(event_loop).expect("Error reregistering discovery socket"),
|
||||
TCP_ACCEPT => event_loop.reregister(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"),
|
||||
TCP_ACCEPT => event_loop.reregister(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), Ready::all(), PollOpt::edge()).expect("Error reregistering stream"),
|
||||
_ => warn!("Unexpected stream update")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -70,6 +70,7 @@ extern crate slab;
|
||||
extern crate ethkey;
|
||||
extern crate ethcrypto as crypto;
|
||||
extern crate rlp;
|
||||
extern crate bytes;
|
||||
|
||||
#[macro_use]
|
||||
extern crate log;
|
||||
|
||||
@@ -19,6 +19,7 @@ use std::net::SocketAddr;
|
||||
use std::cmp::Ordering;
|
||||
use std::sync::*;
|
||||
use mio::*;
|
||||
use mio::deprecated::{Handler, EventLoop};
|
||||
use mio::tcp::*;
|
||||
use util::hash::*;
|
||||
use rlp::*;
|
||||
|
||||
@@ -133,7 +133,7 @@ impl<'db> TrieDB<'db> {
|
||||
}
|
||||
|
||||
/// Get the data of the root node.
|
||||
fn root_data<'a, R: 'a + Recorder>(&self, r: &'a mut R) -> super::Result<DBValue> {
|
||||
fn root_data<R: Recorder>(&self, r: &mut R) -> super::Result<DBValue> {
|
||||
self.db.get(self.root).ok_or_else(|| Box::new(TrieError::InvalidStateRoot(*self.root)))
|
||||
.map(|node| { r.record(self.root, &*node, 0); node })
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user