Windows build (#1253)

* Networking refactoring

* Fixed typo

* Trace logging

* Updated dependencies for windows build

* Windows fixes

* use mio 0.5

* nix build

* Windows build fix

* style

* removed unused import

* ipc crate version bump

* ipc config for named pipes

* tweaks and fixes

* tweaks and fixes

* final version bump

* Fixed tests

* Disable color output on windows

* Added missing doc
This commit is contained in:
Arkadiy Paronyan 2016-06-13 18:55:24 +02:00 committed by Gav Wood
parent 4ef4819bf9
commit 6b12334136
12 changed files with 293 additions and 198 deletions

30
Cargo.lock generated
View File

@ -20,7 +20,7 @@ dependencies = [
"ethsync 1.2.0", "ethsync 1.2.0",
"fdlimit 0.1.0", "fdlimit 0.1.0",
"hyper 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
"json-ipc-server 0.1.0 (git+https://github.com/ethcore/json-ipc-server.git)", "json-ipc-server 0.2.2 (git+https://github.com/ethcore/json-ipc-server.git)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
"number_prefix 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", "number_prefix 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)",
@ -346,7 +346,7 @@ dependencies = [
"ethcore-util 1.2.0", "ethcore-util 1.2.0",
"ethjson 0.1.0", "ethjson 0.1.0",
"ethsync 1.2.0", "ethsync 1.2.0",
"json-ipc-server 0.1.0 (git+https://github.com/ethcore/json-ipc-server.git)", "json-ipc-server 0.2.2 (git+https://github.com/ethcore/json-ipc-server.git)",
"jsonrpc-core 2.0.5 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 2.0.5 (registry+https://github.com/rust-lang/crates.io-index)",
"jsonrpc-http-server 5.1.0 (git+https://github.com/ethcore/jsonrpc-http-server.git)", "jsonrpc-http-server 5.1.0 (git+https://github.com/ethcore/jsonrpc-http-server.git)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
@ -394,7 +394,7 @@ dependencies = [
"lazy_static 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"mio 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "mio 0.5.1 (git+https://github.com/ethcore/mio?branch=v0.5.x)",
"nix 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "nix 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
"rocksdb 0.4.5 (git+https://github.com/ethcore/rust-rocksdb)", "rocksdb 0.4.5 (git+https://github.com/ethcore/rust-rocksdb)",
@ -570,12 +570,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]] [[package]]
name = "json-ipc-server" name = "json-ipc-server"
version = "0.1.0" version = "0.2.2"
source = "git+https://github.com/ethcore/json-ipc-server.git#4f9226c4f84dcce2385a188374e3b5fc66b63e68" source = "git+https://github.com/ethcore/json-ipc-server.git#15ef25e5f859d2d27469c92cc13dd1ddea03e444"
dependencies = [ dependencies = [
"bytes 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"jsonrpc-core 2.0.5 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 2.0.5 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"mio 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "mio 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
"miow 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"slab 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "slab 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
@ -704,6 +708,22 @@ dependencies = [
"winapi 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", "winapi 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "mio"
version = "0.5.1"
source = "git+https://github.com/ethcore/mio?branch=v0.5.x#1fc881771fb8c2517317b4f805d7b88235be422b"
dependencies = [
"bytes 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"miow 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"net2 0.2.23 (registry+https://github.com/rust-lang/crates.io-index)",
"nix 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"slab 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "mio" name = "mio"
version = "0.5.1" version = "0.5.1"

View File

@ -19,7 +19,6 @@ docopt = "0.6"
time = "0.1" time = "0.1"
ctrlc = { git = "https://github.com/ethcore/rust-ctrlc.git" } ctrlc = { git = "https://github.com/ethcore/rust-ctrlc.git" }
fdlimit = { path = "util/fdlimit" } fdlimit = { path = "util/fdlimit" }
daemonize = "0.2"
num_cpus = "0.2" num_cpus = "0.2"
number_prefix = "0.2" number_prefix = "0.2"
rpassword = "0.2.1" rpassword = "0.2.1"
@ -37,6 +36,9 @@ ethcore-ipc = { path = "ipc/rpc" }
json-ipc-server = { git = "https://github.com/ethcore/json-ipc-server.git" } json-ipc-server = { git = "https://github.com/ethcore/json-ipc-server.git" }
ansi_term = "0.7" ansi_term = "0.7"
[target.'cfg(not(windows))'.dependencies]
daemonize = "0.2"
[dependencies.hyper] [dependencies.hyper]
version = "0.8" version = "0.8"
default-features = false default-features = false

View File

@ -287,10 +287,15 @@ impl Configuration {
} }
fn geth_ipc_path(&self) -> String { fn geth_ipc_path(&self) -> String {
if cfg!(windows) {
r"\\.\pipe\geth.ipc".to_owned()
}
else {
if self.args.flag_testnet { path::ethereum::with_testnet("geth.ipc") } if self.args.flag_testnet { path::ethereum::with_testnet("geth.ipc") }
else { path::ethereum::with_default("geth.ipc") } else { path::ethereum::with_default("geth.ipc") }
.to_str().unwrap().to_owned() .to_str().unwrap().to_owned()
} }
}
pub fn keys_iterations(&self) -> u32 { pub fn keys_iterations(&self) -> u32 {
self.args.flag_keys_iterations self.args.flag_keys_iterations
@ -358,7 +363,18 @@ impl Configuration {
fn ipc_path(&self) -> String { fn ipc_path(&self) -> String {
if self.args.flag_geth { self.geth_ipc_path() } if self.args.flag_geth { self.geth_ipc_path() }
else { Configuration::replace_home(&self.args.flag_ipcpath.clone().unwrap_or(self.args.flag_ipc_path.clone())) } else {
if cfg!(windows) {
r"\\.\pipe\parity.jsonrpc".to_owned()
}
else {
Configuration::replace_home(&self.args.flag_ipcpath.clone().unwrap_or(self.args.flag_ipc_path.clone()))
}
}
}
pub fn have_color(&self) -> bool {
!self.args.flag_no_color && !cfg!(windows)
} }
pub fn signer_port(&self) -> Option<u16> { pub fn signer_port(&self) -> Option<u16> {

View File

@ -32,6 +32,7 @@ extern crate log as rlog;
extern crate env_logger; extern crate env_logger;
extern crate ctrlc; extern crate ctrlc;
extern crate fdlimit; extern crate fdlimit;
#[cfg(not(windows))]
extern crate daemonize; extern crate daemonize;
extern crate time; extern crate time;
extern crate number_prefix; extern crate number_prefix;
@ -86,7 +87,6 @@ use ethcore::service::ClientService;
use ethcore::spec::Spec; use ethcore::spec::Spec;
use ethsync::EthSync; use ethsync::EthSync;
use ethcore::miner::{Miner, MinerService, ExternalMiner}; use ethcore::miner::{Miner, MinerService, ExternalMiner};
use daemonize::Daemonize;
use migration::migrate; use migration::migrate;
use informant::Informant; use informant::Informant;
@ -115,11 +115,7 @@ fn execute(conf: Configuration) {
execute_upgrades(&conf, &spec, &client_config); execute_upgrades(&conf, &spec, &client_config);
if conf.args.cmd_daemon { if conf.args.cmd_daemon {
Daemonize::new() daemonize(&conf);
.pid_file(conf.args.arg_pid_file.clone())
.chown_pid_file(true)
.start()
.unwrap_or_else(|e| die!("Couldn't daemonize; {}", e));
} }
if conf.args.cmd_account { if conf.args.cmd_account {
@ -145,6 +141,20 @@ fn execute(conf: Configuration) {
execute_client(conf, spec, client_config); execute_client(conf, spec, client_config);
} }
#[cfg(not(windows))]
fn daemonize(conf: &Configuration) {
use daemonize::Daemonize;
Daemonize::new()
.pid_file(conf.args.arg_pid_file.clone())
.chown_pid_file(true)
.start()
.unwrap_or_else(|e| die!("Couldn't daemonize; {}", e));
}
#[cfg(windows)]
fn daemonize(_conf: &Configuration) {
}
fn execute_upgrades(conf: &Configuration, spec: &Spec, client_config: &ClientConfig) { fn execute_upgrades(conf: &Configuration, spec: &Spec, client_config: &ClientConfig) {
match ::upgrade::upgrade(Some(&conf.path())) { match ::upgrade::upgrade(Some(&conf.path())) {
Ok(upgrades_applied) if upgrades_applied > 0 => { Ok(upgrades_applied) if upgrades_applied > 0 => {
@ -228,6 +238,7 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig)
// setup ipc rpc // setup ipc rpc
let _ipc_server = rpc::new_ipc(conf.ipc_settings(), &dependencies); let _ipc_server = rpc::new_ipc(conf.ipc_settings(), &dependencies);
debug!("IPC: {}", conf.ipc_settings());
if conf.args.flag_webapp { println!("WARNING: Flag -w/--webapp is deprecated. Dapps server is now on by default. Ignoring."); } if conf.args.flag_webapp { println!("WARNING: Flag -w/--webapp is deprecated. Dapps server is now on by default. Ignoring."); }
let dapps_server = dapps::new(dapps::Configuration { let dapps_server = dapps::new(dapps::Configuration {
@ -255,7 +266,7 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig)
// Register IO handler // Register IO handler
let io_handler = Arc::new(ClientIoHandler { let io_handler = Arc::new(ClientIoHandler {
client: service.client(), client: service.client(),
info: Informant::new(!conf.args.flag_no_color), info: Informant::new(conf.have_color()),
sync: sync.clone(), sync: sync.clone(),
accounts: account_service.clone(), accounts: account_service.clone(),
}); });
@ -375,8 +386,8 @@ fn execute_import(conf: Configuration) {
panic_handler.forward_from(&service); panic_handler.forward_from(&service);
let client = service.client(); let client = service.client();
let mut instream: Box<Read> = if let Some(f) = conf.args.arg_file { let mut instream: Box<Read> = if let Some(ref f) = conf.args.arg_file {
let f = File::open(&f).unwrap_or_else(|_| die!("Cannot open the file given: {}", f)); let f = File::open(f).unwrap_or_else(|_| die!("Cannot open the file given: {}", f));
Box::new(f) Box::new(f)
} else { } else {
Box::new(::std::io::stdin()) Box::new(::std::io::stdin())
@ -386,7 +397,7 @@ fn execute_import(conf: Configuration) {
let mut first_read = 0; let mut first_read = 0;
let format = match conf.args.flag_format { let format = match conf.args.flag_format {
Some(x) => match x.deref() { Some(ref x) => match x.deref() {
"binary" | "bin" => DataFormat::Binary, "binary" | "bin" => DataFormat::Binary,
"hex" => DataFormat::Hex, "hex" => DataFormat::Hex,
x => die!("Invalid --format parameter given: {:?}", x), x => die!("Invalid --format parameter given: {:?}", x),
@ -407,7 +418,7 @@ fn execute_import(conf: Configuration) {
} }
}; };
let informant = Informant::new(!conf.args.flag_no_color); let informant = Informant::new(conf.have_color());
let do_import = |bytes| { let do_import = |bytes| {
while client.queue_info().is_full() { sleep(Duration::from_secs(1)); } while client.queue_info().is_full() { sleep(Duration::from_secs(1)); }

View File

@ -22,6 +22,7 @@ use util::panics::PanicHandler;
use die::*; use die::*;
use jsonipc; use jsonipc;
use rpc_apis; use rpc_apis;
use std::fmt;
#[cfg(feature = "rpc")] #[cfg(feature = "rpc")]
pub use ethcore_rpc::Server as RpcServer; pub use ethcore_rpc::Server as RpcServer;
@ -44,6 +45,17 @@ pub struct IpcConfiguration {
pub apis: String, pub apis: String,
} }
impl fmt::Display for IpcConfiguration {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if self.enabled {
write!(f, "endpoint address [{}], api list [{}]", self.socket_addr, self.apis)
}
else {
write!(f, "disabled")
}
}
}
pub struct Dependencies { pub struct Dependencies {
pub panic_handler: Arc<PanicHandler>, pub panic_handler: Arc<PanicHandler>,
pub apis: Arc<rpc_apis::Dependencies>, pub apis: Arc<rpc_apis::Dependencies>,
@ -66,12 +78,6 @@ pub fn new_http(conf: HttpConfiguration, deps: &Dependencies) -> Option<RpcServe
Some(setup_http_rpc_server(deps, &addr, conf.cors, apis)) Some(setup_http_rpc_server(deps, &addr, conf.cors, apis))
} }
pub fn new_ipc(conf: IpcConfiguration, deps: &Dependencies) -> Option<jsonipc::Server> {
if !conf.enabled { return None; }
let apis = conf.apis.split(',').collect();
Some(setup_ipc_rpc_server(deps, &conf.socket_addr, apis))
}
fn setup_rpc_server(apis: Vec<&str>, deps: &Dependencies) -> Server { fn setup_rpc_server(apis: Vec<&str>, deps: &Dependencies) -> Server {
let apis = rpc_apis::from_str(apis); let apis = rpc_apis::from_str(apis);
let server = Server::new(); let server = Server::new();
@ -109,10 +115,18 @@ pub fn setup_http_rpc_server(
}, },
} }
} }
#[cfg(not(feature = "rpc"))] #[cfg(not(feature = "rpc"))]
pub fn setup_ipc_rpc_server(_dependencies: &Dependencies, _addr: &str, _apis: Vec<&str>) -> ! { pub fn setup_ipc_rpc_server(_dependencies: &Dependencies, _addr: &str, _apis: Vec<&str>) -> ! {
die!("Your Parity version has been compiled without JSON-RPC support.") die!("Your Parity version has been compiled without JSON-RPC support.")
} }
pub fn new_ipc(conf: IpcConfiguration, deps: &Dependencies) -> Option<jsonipc::Server> {
if !conf.enabled { return None; }
let apis = conf.apis.split(',').collect();
Some(setup_ipc_rpc_server(deps, &conf.socket_addr, apis))
}
#[cfg(feature = "rpc")] #[cfg(feature = "rpc")]
pub fn setup_ipc_rpc_server(dependencies: &Dependencies, addr: &str, apis: Vec<&str>) -> jsonipc::Server { pub fn setup_ipc_rpc_server(dependencies: &Dependencies, addr: &str, apis: Vec<&str>) -> jsonipc::Server {
let server = setup_rpc_server(apis, dependencies); let server = setup_rpc_server(apis, dependencies);

View File

@ -12,7 +12,7 @@ log = "0.3"
env_logger = "0.3" env_logger = "0.3"
rustc-serialize = "0.3" rustc-serialize = "0.3"
arrayvec = "0.3" arrayvec = "0.3"
mio = "0.5.1" mio = { git = "https://github.com/ethcore/mio", branch = "v0.5.x" }
nix ="0.5.0" nix ="0.5.0"
rand = "0.3.12" rand = "0.3.12"
time = "0.1.34" time = "0.1.34"

View File

@ -466,6 +466,7 @@ pub struct KeyDirectory {
} }
/// Restricts the permissions of given path only to the owner. /// Restricts the permissions of given path only to the owner.
#[cfg(not(windows))]
pub fn restrict_permissions_owner(file_path: &Path) -> Result<(), i32> { pub fn restrict_permissions_owner(file_path: &Path) -> Result<(), i32> {
let cstr = ::std::ffi::CString::new(file_path.to_str().unwrap()).unwrap(); let cstr = ::std::ffi::CString::new(file_path.to_str().unwrap()).unwrap();
match unsafe { ::libc::chmod(cstr.as_ptr(), ::libc::S_IWUSR | ::libc::S_IRUSR) } { match unsafe { ::libc::chmod(cstr.as_ptr(), ::libc::S_IWUSR | ::libc::S_IRUSR) } {
@ -474,6 +475,13 @@ pub fn restrict_permissions_owner(file_path: &Path) -> Result<(), i32> {
} }
} }
/// Restricts the permissions of given path only to the owner.
#[cfg(windows)]
pub fn restrict_permissions_owner(_file_path: &Path) -> Result<(), i32> {
//TODO: implement me
Ok(())
}
impl KeyDirectory { impl KeyDirectory {
/// Initializes new cache directory context with a given `path` /// Initializes new cache directory context with a given `path`
pub fn new(path: &Path) -> KeyDirectory { pub fn new(path: &Path) -> KeyDirectory {

View File

@ -17,6 +17,7 @@
use std::sync::Arc; use std::sync::Arc;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use mio::{Handler, Token, EventSet, EventLoop, PollOpt, TryRead, TryWrite}; use mio::{Handler, Token, EventSet, EventLoop, PollOpt, TryRead, TryWrite};
use mio::tcp::*; use mio::tcp::*;
use hash::*; use hash::*;
@ -60,45 +61,57 @@ pub struct GenericConnection<Socket: GenericSocket> {
interest: EventSet, interest: EventSet,
/// Shared network statistics /// Shared network statistics
stats: Arc<NetworkStats>, stats: Arc<NetworkStats>,
/// Registered flag
registered: AtomicBool,
} }
impl<Socket: GenericSocket> GenericConnection<Socket> { impl<Socket: GenericSocket> GenericConnection<Socket> {
pub fn expect(&mut self, size: usize) { pub fn expect(&mut self, size: usize) {
trace!(target:"network", "Expect to read {} bytes", size);
if self.rec_size != self.rec_buf.len() { if self.rec_size != self.rec_buf.len() {
warn!(target:"net", "Unexpected connection read start"); warn!(target:"network", "Unexpected connection read start");
} }
unsafe { self.rec_buf.set_len(0) }
self.rec_size = size; self.rec_size = size;
} }
/// Readable IO handler. Called when there is some data to be read. /// Readable IO handler. Called when there is some data to be read.
pub fn readable(&mut self) -> io::Result<Option<Bytes>> { pub fn readable(&mut self) -> io::Result<Option<Bytes>> {
if self.rec_size == 0 || self.rec_buf.len() >= self.rec_size { if self.rec_size == 0 || self.rec_buf.len() >= self.rec_size {
warn!(target:"net", "Unexpected connection read"); warn!(target:"network", "Unexpected connection read");
} }
let max = self.rec_size - self.rec_buf.len();
// resolve "multiple applicable items in scope [E0034]" error
let sock_ref = <Socket as Read>::by_ref(&mut self.socket); let sock_ref = <Socket as Read>::by_ref(&mut self.socket);
loop {
let max = self.rec_size - self.rec_buf.len();
match sock_ref.take(max as u64).try_read_buf(&mut self.rec_buf) { match sock_ref.take(max as u64).try_read_buf(&mut self.rec_buf) {
Ok(Some(size)) if size != 0 => { Ok(Some(size)) if size != 0 => {
self.stats.inc_recv(size); self.stats.inc_recv(size);
trace!(target:"network", "{}: Read {} of {} bytes", self.token, self.rec_buf.len(), self.rec_size);
if self.rec_size != 0 && self.rec_buf.len() == self.rec_size { if self.rec_size != 0 && self.rec_buf.len() == self.rec_size {
self.rec_size = 0; self.rec_size = 0;
Ok(Some(::std::mem::replace(&mut self.rec_buf, Bytes::new()))) return Ok(Some(::std::mem::replace(&mut self.rec_buf, Bytes::new())))
} else { Ok(None) } }
else if self.rec_buf.len() > self.rec_size {
warn!(target:"network", "Read past buffer {} bytes", self.rec_buf.len() - self.rec_size);
return Ok(Some(::std::mem::replace(&mut self.rec_buf, Bytes::new())))
}
}, },
Ok(_) => Ok(None), Ok(_) => return Ok(None),
Err(e) => Err(e), Err(e) => {
debug!(target:"network", "Read error {} ({})", self.token, e);
return Err(e)
}
}
} }
} }
/// Add a packet to send queue. /// Add a packet to send queue.
pub fn send(&mut self, data: Bytes) { pub fn send<Message>(&mut self, io: &IoContext<Message>, data: Bytes) where Message: Send + Clone {
if !data.is_empty() { if !data.is_empty() {
self.send_queue.push_back(Cursor::new(data)); self.send_queue.push_back(Cursor::new(data));
} }
if !self.interest.is_writable() { if !self.interest.is_writable() {
self.interest.insert(EventSet::writable()); self.interest.insert(EventSet::writable());
io.update_registration(self.token).ok();
} }
} }
@ -108,7 +121,7 @@ impl<Socket: GenericSocket> GenericConnection<Socket> {
} }
/// Writable IO handler. Called when the socket is ready to send. /// Writable IO handler. Called when the socket is ready to send.
pub fn writable(&mut self) -> io::Result<WriteStatus> { pub fn writable<Message>(&mut self, io: &IoContext<Message>) -> Result<WriteStatus, UtilError> where Message: Send + Clone {
if self.send_queue.is_empty() { if self.send_queue.is_empty() {
return Ok(WriteStatus::Complete) return Ok(WriteStatus::Complete)
} }
@ -121,7 +134,6 @@ impl<Socket: GenericSocket> GenericConnection<Socket> {
} }
match self.socket.try_write_buf(buf) { match self.socket.try_write_buf(buf) {
Ok(Some(size)) if (buf.position() as usize) < send_size => { Ok(Some(size)) if (buf.position() as usize) < send_size => {
self.interest.insert(EventSet::writable());
self.stats.inc_send(size); self.stats.inc_send(size);
Ok(WriteStatus::Ongoing) Ok(WriteStatus::Ongoing)
}, },
@ -131,7 +143,7 @@ impl<Socket: GenericSocket> GenericConnection<Socket> {
}, },
Ok(Some(_)) => { panic!("Wrote past buffer");}, Ok(Some(_)) => { panic!("Wrote past buffer");},
Ok(None) => Ok(WriteStatus::Ongoing), Ok(None) => Ok(WriteStatus::Ongoing),
Err(e) => Err(e) Err(e) => try!(Err(e))
} }
}.and_then(|r| { }.and_then(|r| {
if r == WriteStatus::Complete { if r == WriteStatus::Complete {
@ -139,9 +151,7 @@ impl<Socket: GenericSocket> GenericConnection<Socket> {
} }
if self.send_queue.is_empty() { if self.send_queue.is_empty() {
self.interest.remove(EventSet::writable()); self.interest.remove(EventSet::writable());
} try!(io.update_registration(self.token));
else {
self.interest.insert(EventSet::writable());
} }
Ok(r) Ok(r)
}) })
@ -162,6 +172,7 @@ impl Connection {
rec_size: 0, rec_size: 0,
interest: EventSet::hup() | EventSet::readable(), interest: EventSet::hup() | EventSet::readable(),
stats: stats, stats: stats,
registered: AtomicBool::new(false),
} }
} }
@ -188,27 +199,36 @@ impl Connection {
rec_buf: Vec::new(), rec_buf: Vec::new(),
rec_size: 0, rec_size: 0,
send_queue: self.send_queue.clone(), send_queue: self.send_queue.clone(),
interest: EventSet::hup() | EventSet::readable(), interest: EventSet::hup(),
stats: self.stats.clone(), stats: self.stats.clone(),
registered: AtomicBool::new(false),
}) })
} }
/// Register this connection with the IO event loop. /// Register this connection with the IO event loop.
pub fn register_socket<Host: Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> io::Result<()> { pub fn register_socket<Host: Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
if self.registered.load(AtomicOrdering::SeqCst) {
return Ok(());
}
trace!(target: "network", "connection register; token={:?}", reg); trace!(target: "network", "connection register; token={:?}", reg);
if let Err(e) = event_loop.register(&self.socket, reg, self.interest, PollOpt::edge() /* | PollOpt::oneshot() */) { // TODO: oneshot is broken on windows if let Err(e) = event_loop.register(&self.socket, reg, self.interest, PollOpt::edge() /* | PollOpt::oneshot() */) { // TODO: oneshot is broken on windows
trace!(target: "network", "Failed to register {:?}, {:?}", reg, e); trace!(target: "network", "Failed to register {:?}, {:?}", reg, e);
} }
self.registered.store(true, AtomicOrdering::SeqCst);
Ok(()) Ok(())
} }
/// Update connection registration. Should be called at the end of the IO handler. /// Update connection registration. Should be called at the end of the IO handler.
pub fn update_socket<Host: Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> io::Result<()> { pub fn update_socket<Host: Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
trace!(target: "network", "connection reregister; token={:?}", reg); trace!(target: "network", "connection reregister; token={:?}", reg);
event_loop.reregister( &self.socket, reg, self.interest, PollOpt::edge() /* | PollOpt::oneshot() */ ).or_else(|e| { // TODO: oneshot is broken on windows if !self.registered.load(AtomicOrdering::SeqCst) {
self.register_socket(reg, event_loop)
} else {
event_loop.reregister(&self.socket, reg, self.interest, PollOpt::edge() /* | PollOpt::oneshot() */ ).unwrap_or_else(|e| { // TODO: oneshot is broken on windows
trace!(target: "network", "Failed to reregister {:?}, {:?}", reg, e); trace!(target: "network", "Failed to reregister {:?}, {:?}", reg, e);
});
Ok(()) Ok(())
}) }
} }
/// Delete connection registration. Should be called at the end of the IO handler. /// Delete connection registration. Should be called at the end of the IO handler.
@ -266,7 +286,7 @@ pub struct EncryptedConnection {
} }
impl EncryptedConnection { impl EncryptedConnection {
/// Create an encrypted connection out of the handshake. Consumes a handshake object. /// Create an encrypted connection out of the handshake.
pub fn new(handshake: &mut Handshake) -> Result<EncryptedConnection, UtilError> { pub fn new(handshake: &mut Handshake) -> Result<EncryptedConnection, UtilError> {
let shared = try!(crypto::ecdh::agree(handshake.ecdhe.secret(), &handshake.remote_ephemeral)); let shared = try!(crypto::ecdh::agree(handshake.ecdhe.secret(), &handshake.remote_ephemeral));
let mut nonce_material = H512::new(); let mut nonce_material = H512::new();
@ -320,7 +340,7 @@ impl EncryptedConnection {
} }
/// Send a packet /// Send a packet
pub fn send_packet(&mut self, payload: &[u8]) -> Result<(), UtilError> { pub fn send_packet<Message>(&mut self, io: &IoContext<Message>, payload: &[u8]) -> Result<(), UtilError> where Message: Send + Clone {
let mut header = RlpStream::new(); let mut header = RlpStream::new();
let len = payload.len() as usize; let len = payload.len() as usize;
header.append_raw(&[(len >> 16) as u8, (len >> 8) as u8, len as u8], 1); header.append_raw(&[(len >> 16) as u8, (len >> 8) as u8, len as u8], 1);
@ -342,7 +362,7 @@ impl EncryptedConnection {
self.egress_mac.update(&packet[32..(32 + len + padding)]); self.egress_mac.update(&packet[32..(32 + len + padding)]);
EncryptedConnection::update_mac(&mut self.egress_mac, &mut self.mac_encoder, &[0u8; 0]); EncryptedConnection::update_mac(&mut self.egress_mac, &mut self.mac_encoder, &[0u8; 0]);
self.egress_mac.clone().finalize(&mut packet[(32 + len + padding)..]); self.egress_mac.clone().finalize(&mut packet[(32 + len + padding)..]);
self.connection.send(packet); self.connection.send(io, packet);
Ok(()) Ok(())
} }
@ -417,15 +437,13 @@ impl EncryptedConnection {
/// Readable IO handler. Tracker receive status and returns decoded packet if avaialable. /// Readable IO handler. Tracker receive status and returns decoded packet if avaialable.
pub fn readable<Message>(&mut self, io: &IoContext<Message>) -> Result<Option<Packet>, UtilError> where Message: Send + Clone{ pub fn readable<Message>(&mut self, io: &IoContext<Message>) -> Result<Option<Packet>, UtilError> where Message: Send + Clone{
io.clear_timer(self.connection.token).unwrap(); io.clear_timer(self.connection.token).unwrap();
match self.read_state { if let EncryptedConnectionState::Header = self.read_state {
EncryptedConnectionState::Header => {
if let Some(data) = try!(self.connection.readable()) { if let Some(data) = try!(self.connection.readable()) {
try!(self.read_header(&data)); try!(self.read_header(&data));
try!(io.register_timer(self.connection.token, RECIEVE_PAYLOAD_TIMEOUT)); try!(io.register_timer(self.connection.token, RECIEVE_PAYLOAD_TIMEOUT));
} }
Ok(None) };
}, if let EncryptedConnectionState::Payload = self.read_state {
EncryptedConnectionState::Payload => {
match try!(self.connection.readable()) { match try!(self.connection.readable()) {
Some(data) => { Some(data) => {
self.read_state = EncryptedConnectionState::Header; self.read_state = EncryptedConnectionState::Header;
@ -434,14 +452,14 @@ impl EncryptedConnection {
}, },
None => Ok(None) None => Ok(None)
} }
} } else {
Ok(None)
} }
} }
/// Writable IO handler. Processes send queeue. /// Writable IO handler. Processes send queeue.
pub fn writable<Message>(&mut self, io: &IoContext<Message>) -> Result<(), UtilError> where Message: Send + Clone { pub fn writable<Message>(&mut self, io: &IoContext<Message>) -> Result<(), UtilError> where Message: Send + Clone {
io.clear_timer(self.connection.token).unwrap(); try!(self.connection.writable(io));
try!(self.connection.writable());
Ok(()) Ok(())
} }
} }
@ -472,12 +490,14 @@ pub fn test_encryption() {
mod tests { mod tests {
use super::*; use super::*;
use std::sync::*; use std::sync::*;
use std::sync::atomic::AtomicBool;
use super::super::stats::*; use super::super::stats::*;
use std::io::{Read, Write, Error, Cursor, ErrorKind}; use std::io::{Read, Write, Error, Cursor, ErrorKind};
use mio::{EventSet}; use mio::{EventSet};
use std::collections::VecDeque; use std::collections::VecDeque;
use bytes::*; use bytes::*;
use devtools::*; use devtools::*;
use io::*;
impl GenericSocket for TestSocket {} impl GenericSocket for TestSocket {}
@ -521,6 +541,7 @@ mod tests {
rec_size: 0, rec_size: 0,
interest: EventSet::hup() | EventSet::readable(), interest: EventSet::hup() | EventSet::readable(),
stats: Arc::<NetworkStats>::new(NetworkStats::new()), stats: Arc::<NetworkStats>::new(NetworkStats::new()),
registered: AtomicBool::new(false),
} }
} }
} }
@ -543,10 +564,15 @@ mod tests {
rec_size: 0, rec_size: 0,
interest: EventSet::hup() | EventSet::readable(), interest: EventSet::hup() | EventSet::readable(),
stats: Arc::<NetworkStats>::new(NetworkStats::new()), stats: Arc::<NetworkStats>::new(NetworkStats::new()),
registered: AtomicBool::new(false),
} }
} }
} }
fn test_io() -> IoContext<i32> {
IoContext::new(IoChannel::disconnected(), 0)
}
#[test] #[test]
fn connection_expect() { fn connection_expect() {
let mut connection = TestConnection::new(); let mut connection = TestConnection::new();
@ -557,7 +583,7 @@ mod tests {
#[test] #[test]
fn connection_write_empty() { fn connection_write_empty() {
let mut connection = TestConnection::new(); let mut connection = TestConnection::new();
let status = connection.writable(); let status = connection.writable(&test_io());
assert!(status.is_ok()); assert!(status.is_ok());
assert!(WriteStatus::Complete == status.unwrap()); assert!(WriteStatus::Complete == status.unwrap());
} }
@ -568,7 +594,7 @@ mod tests {
let data = Cursor::new(vec![0; 10240]); let data = Cursor::new(vec![0; 10240]);
connection.send_queue.push_back(data); connection.send_queue.push_back(data);
let status = connection.writable(); let status = connection.writable(&test_io());
assert!(status.is_ok()); assert!(status.is_ok());
assert!(WriteStatus::Complete == status.unwrap()); assert!(WriteStatus::Complete == status.unwrap());
assert_eq!(10240, connection.socket.write_buffer.len()); assert_eq!(10240, connection.socket.write_buffer.len());
@ -581,7 +607,7 @@ mod tests {
let data = Cursor::new(vec![0; 10240]); let data = Cursor::new(vec![0; 10240]);
connection.send_queue.push_back(data); connection.send_queue.push_back(data);
let status = connection.writable(); let status = connection.writable(&test_io());
assert!(status.is_ok()); assert!(status.is_ok());
assert!(WriteStatus::Ongoing == status.unwrap()); assert!(WriteStatus::Ongoing == status.unwrap());
@ -594,7 +620,7 @@ mod tests {
let data = Cursor::new(vec![0; 10240]); let data = Cursor::new(vec![0; 10240]);
connection.send_queue.push_back(data); connection.send_queue.push_back(data);
let status = connection.writable(); let status = connection.writable(&test_io());
assert!(!status.is_ok()); assert!(!status.is_ok());
assert_eq!(1, connection.send_queue.len()); assert_eq!(1, connection.send_queue.len());

View File

@ -111,7 +111,7 @@ impl Handshake {
self.originated = originated; self.originated = originated;
io.register_timer(self.connection.token, HANDSHAKE_TIMEOUT).ok(); io.register_timer(self.connection.token, HANDSHAKE_TIMEOUT).ok();
if originated { if originated {
try!(self.write_auth(host.secret(), host.id())); try!(self.write_auth(io, host.secret(), host.id()));
} }
else { else {
self.state = HandshakeState::ReadingAuth; self.state = HandshakeState::ReadingAuth;
@ -128,17 +128,17 @@ impl Handshake {
/// Readable IO handler. Drives the state change. /// Readable IO handler. Drives the state change.
pub fn readable<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo) -> Result<(), UtilError> where Message: Send + Clone { pub fn readable<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo) -> Result<(), UtilError> where Message: Send + Clone {
if !self.expired() { if !self.expired() {
io.clear_timer(self.connection.token).unwrap(); io.clear_timer(self.connection.token).ok();
match self.state { match self.state {
HandshakeState::New => {} HandshakeState::New => {}
HandshakeState::ReadingAuth => { HandshakeState::ReadingAuth => {
if let Some(data) = try!(self.connection.readable()) { if let Some(data) = try!(self.connection.readable()) {
try!(self.read_auth(host.secret(), &data)); try!(self.read_auth(io, host.secret(), &data));
}; };
}, },
HandshakeState::ReadingAuthEip8 => { HandshakeState::ReadingAuthEip8 => {
if let Some(data) = try!(self.connection.readable()) { if let Some(data) = try!(self.connection.readable()) {
try!(self.read_auth_eip8(host.secret(), &data)); try!(self.read_auth_eip8(io, host.secret(), &data));
}; };
}, },
HandshakeState::ReadingAck => { HandshakeState::ReadingAck => {
@ -153,9 +153,6 @@ impl Handshake {
}, },
HandshakeState::StartSession => {}, HandshakeState::StartSession => {},
} }
if self.state != HandshakeState::StartSession {
try!(io.update_registration(self.connection.token));
}
} }
Ok(()) Ok(())
} }
@ -163,11 +160,7 @@ impl Handshake {
/// Writabe IO handler. /// Writabe IO handler.
pub fn writable<Message>(&mut self, io: &IoContext<Message>) -> Result<(), UtilError> where Message: Send + Clone { pub fn writable<Message>(&mut self, io: &IoContext<Message>) -> Result<(), UtilError> where Message: Send + Clone {
if !self.expired() { if !self.expired() {
io.clear_timer(self.connection.token).unwrap(); try!(self.connection.writable(io));
try!(self.connection.writable());
if self.state != HandshakeState::StartSession {
io.update_registration(self.connection.token).unwrap();
}
} }
Ok(()) Ok(())
} }
@ -183,7 +176,7 @@ impl Handshake {
} }
/// Parse, validate and confirm auth message /// Parse, validate and confirm auth message
fn read_auth(&mut self, secret: &Secret, data: &[u8]) -> Result<(), UtilError> { fn read_auth<Message>(&mut self, io: &IoContext<Message>, secret: &Secret, data: &[u8]) -> Result<(), UtilError> where Message: Send + Clone {
trace!(target:"network", "Received handshake auth from {:?}", self.connection.remote_addr_str()); trace!(target:"network", "Received handshake auth from {:?}", self.connection.remote_addr_str());
if data.len() != V4_AUTH_PACKET_SIZE { if data.len() != V4_AUTH_PACKET_SIZE {
debug!(target:"net", "Wrong auth packet size"); debug!(target:"net", "Wrong auth packet size");
@ -197,7 +190,7 @@ impl Handshake {
let (pubk, rest) = rest.split_at(64); let (pubk, rest) = rest.split_at(64);
let (nonce, _) = rest.split_at(32); let (nonce, _) = rest.split_at(32);
try!(self.set_auth(secret, sig, pubk, nonce, PROTOCOL_VERSION)); try!(self.set_auth(secret, sig, pubk, nonce, PROTOCOL_VERSION));
try!(self.write_ack()); try!(self.write_ack(io));
} }
Err(_) => { Err(_) => {
// Try to interpret as EIP-8 packet // Try to interpret as EIP-8 packet
@ -214,7 +207,7 @@ impl Handshake {
Ok(()) Ok(())
} }
fn read_auth_eip8(&mut self, secret: &Secret, data: &[u8]) -> Result<(), UtilError> { fn read_auth_eip8<Message>(&mut self, io: &IoContext<Message>, secret: &Secret, data: &[u8]) -> Result<(), UtilError> where Message: Send + Clone {
trace!(target:"network", "Received EIP8 handshake auth from {:?}", self.connection.remote_addr_str()); trace!(target:"network", "Received EIP8 handshake auth from {:?}", self.connection.remote_addr_str());
self.auth_cipher.extend_from_slice(data); self.auth_cipher.extend_from_slice(data);
let auth = try!(ecies::decrypt(secret, &self.auth_cipher[0..2], &self.auth_cipher[2..])); let auth = try!(ecies::decrypt(secret, &self.auth_cipher[0..2], &self.auth_cipher[2..]));
@ -224,13 +217,13 @@ impl Handshake {
let remote_nonce: H256 = try!(rlp.val_at(2)); let remote_nonce: H256 = try!(rlp.val_at(2));
let remote_version: u64 = try!(rlp.val_at(3)); let remote_version: u64 = try!(rlp.val_at(3));
try!(self.set_auth(secret, &signature, &remote_public, &remote_nonce, remote_version)); try!(self.set_auth(secret, &signature, &remote_public, &remote_nonce, remote_version));
try!(self.write_ack_eip8()); try!(self.write_ack_eip8(io));
Ok(()) Ok(())
} }
/// Parse and validate ack message /// Parse and validate ack message
fn read_ack(&mut self, secret: &Secret, data: &[u8]) -> Result<(), UtilError> { fn read_ack(&mut self, secret: &Secret, data: &[u8]) -> Result<(), UtilError> {
trace!(target:"network", "Received handshake auth to {:?}", self.connection.remote_addr_str()); trace!(target:"network", "Received handshake ack from {:?}", self.connection.remote_addr_str());
if data.len() != V4_ACK_PACKET_SIZE { if data.len() != V4_ACK_PACKET_SIZE {
debug!(target:"net", "Wrong ack packet size"); debug!(target:"net", "Wrong ack packet size");
return Err(From::from(NetworkError::BadProtocol)); return Err(From::from(NetworkError::BadProtocol));
@ -270,7 +263,7 @@ impl Handshake {
} }
/// Sends auth message /// Sends auth message
fn write_auth(&mut self, secret: &Secret, public: &Public) -> Result<(), UtilError> { fn write_auth<Message>(&mut self, io: &IoContext<Message>, secret: &Secret, public: &Public) -> Result<(), UtilError> where Message: Send + Clone {
trace!(target:"network", "Sending handshake auth to {:?}", self.connection.remote_addr_str()); trace!(target:"network", "Sending handshake auth to {:?}", self.connection.remote_addr_str());
let mut data = [0u8; /*Signature::SIZE*/ 65 + /*H256::SIZE*/ 32 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32 + 1]; //TODO: use associated constants let mut data = [0u8; /*Signature::SIZE*/ 65 + /*H256::SIZE*/ 32 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32 + 1]; //TODO: use associated constants
let len = data.len(); let len = data.len();
@ -290,14 +283,14 @@ impl Handshake {
} }
let message = try!(crypto::ecies::encrypt(&self.id, &[], &data)); let message = try!(crypto::ecies::encrypt(&self.id, &[], &data));
self.auth_cipher = message.clone(); self.auth_cipher = message.clone();
self.connection.send(message); self.connection.send(io, message);
self.connection.expect(V4_ACK_PACKET_SIZE); self.connection.expect(V4_ACK_PACKET_SIZE);
self.state = HandshakeState::ReadingAck; self.state = HandshakeState::ReadingAck;
Ok(()) Ok(())
} }
/// Sends ack message /// Sends ack message
fn write_ack(&mut self) -> Result<(), UtilError> { fn write_ack<Message>(&mut self, io: &IoContext<Message>) -> Result<(), UtilError> where Message: Send + Clone {
trace!(target:"network", "Sending handshake ack to {:?}", self.connection.remote_addr_str()); trace!(target:"network", "Sending handshake ack to {:?}", self.connection.remote_addr_str());
let mut data = [0u8; 1 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32]; //TODO: use associated constants let mut data = [0u8; 1 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32]; //TODO: use associated constants
let len = data.len(); let len = data.len();
@ -310,13 +303,13 @@ impl Handshake {
} }
let message = try!(crypto::ecies::encrypt(&self.id, &[], &data)); let message = try!(crypto::ecies::encrypt(&self.id, &[], &data));
self.ack_cipher = message.clone(); self.ack_cipher = message.clone();
self.connection.send(message); self.connection.send(io, message);
self.state = HandshakeState::StartSession; self.state = HandshakeState::StartSession;
Ok(()) Ok(())
} }
/// Sends EIP8 ack message /// Sends EIP8 ack message
fn write_ack_eip8(&mut self) -> Result<(), UtilError> { fn write_ack_eip8<Message>(&mut self, io: &IoContext<Message>) -> Result<(), UtilError> where Message: Send + Clone {
trace!(target:"network", "Sending EIP8 handshake ack to {:?}", self.connection.remote_addr_str()); trace!(target:"network", "Sending EIP8 handshake ack to {:?}", self.connection.remote_addr_str());
let mut rlp = RlpStream::new_list(3); let mut rlp = RlpStream::new_list(3);
rlp.append(self.ecdhe.public()); rlp.append(self.ecdhe.public());
@ -333,7 +326,7 @@ impl Handshake {
let message = try!(crypto::ecies::encrypt(&self.id, &prefix, &encoded)); let message = try!(crypto::ecies::encrypt(&self.id, &prefix, &encoded));
self.ack_cipher.extend_from_slice(&prefix); self.ack_cipher.extend_from_slice(&prefix);
self.ack_cipher.extend_from_slice(&message); self.ack_cipher.extend_from_slice(&message);
self.connection.send(self.ack_cipher.clone()); self.connection.send(io, self.ack_cipher.clone());
self.state = HandshakeState::StartSession; self.state = HandshakeState::StartSession;
Ok(()) Ok(())
} }
@ -347,6 +340,7 @@ mod test {
use super::*; use super::*;
use crypto::*; use crypto::*;
use hash::*; use hash::*;
use io::*;
use std::net::SocketAddr; use std::net::SocketAddr;
use mio::tcp::TcpStream; use mio::tcp::TcpStream;
use network::stats::NetworkStats; use network::stats::NetworkStats;
@ -371,6 +365,10 @@ mod test {
Handshake::new(0, to, socket, &nonce, Arc::new(NetworkStats::new())).unwrap() Handshake::new(0, to, socket, &nonce, Arc::new(NetworkStats::new())).unwrap()
} }
fn test_io() -> IoContext<i32> {
IoContext::new(IoChannel::disconnected(), 0)
}
#[test] #[test]
fn test_handshake_auth_plain() { fn test_handshake_auth_plain() {
let mut h = create_handshake(None); let mut h = create_handshake(None);
@ -387,7 +385,7 @@ mod test {
a4592ee77e2bd94d0be3691f3b406f9bba9b591fc63facc016bfa8\ a4592ee77e2bd94d0be3691f3b406f9bba9b591fc63facc016bfa8\
".from_hex().unwrap(); ".from_hex().unwrap();
h.read_auth(&secret, &auth).unwrap(); h.read_auth(&test_io(), &secret, &auth).unwrap();
assert_eq!(h.state, super::HandshakeState::StartSession); assert_eq!(h.state, super::HandshakeState::StartSession);
check_auth(&h, 4); check_auth(&h, 4);
} }
@ -411,9 +409,9 @@ mod test {
3bf7678318e2d5b5340c9e488eefea198576344afbdf66db5f51204a6961a63ce072c8926c\ 3bf7678318e2d5b5340c9e488eefea198576344afbdf66db5f51204a6961a63ce072c8926c\
".from_hex().unwrap(); ".from_hex().unwrap();
h.read_auth(&secret, &auth[0..super::V4_AUTH_PACKET_SIZE]).unwrap(); h.read_auth(&test_io(), &secret, &auth[0..super::V4_AUTH_PACKET_SIZE]).unwrap();
assert_eq!(h.state, super::HandshakeState::ReadingAuthEip8); assert_eq!(h.state, super::HandshakeState::ReadingAuthEip8);
h.read_auth_eip8(&secret, &auth[super::V4_AUTH_PACKET_SIZE..]).unwrap(); h.read_auth_eip8(&test_io(), &secret, &auth[super::V4_AUTH_PACKET_SIZE..]).unwrap();
assert_eq!(h.state, super::HandshakeState::StartSession); assert_eq!(h.state, super::HandshakeState::StartSession);
check_auth(&h, 4); check_auth(&h, 4);
} }
@ -438,9 +436,9 @@ mod test {
d490\ d490\
".from_hex().unwrap(); ".from_hex().unwrap();
h.read_auth(&secret, &auth[0..super::V4_AUTH_PACKET_SIZE]).unwrap(); h.read_auth(&test_io(), &secret, &auth[0..super::V4_AUTH_PACKET_SIZE]).unwrap();
assert_eq!(h.state, super::HandshakeState::ReadingAuthEip8); assert_eq!(h.state, super::HandshakeState::ReadingAuthEip8);
h.read_auth_eip8(&secret, &auth[super::V4_AUTH_PACKET_SIZE..]).unwrap(); h.read_auth_eip8(&test_io(), &secret, &auth[super::V4_AUTH_PACKET_SIZE..]).unwrap();
assert_eq!(h.state, super::HandshakeState::StartSession); assert_eq!(h.state, super::HandshakeState::StartSession);
check_auth(&h, 56); check_auth(&h, 56);
let ack = h.ack_cipher.clone(); let ack = h.ack_cipher.clone();

View File

@ -215,8 +215,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
pub fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError> { pub fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError> {
let session = self.resolve_session(peer); let session = self.resolve_session(peer);
if let Some(session) = session { if let Some(session) = session {
try!(session.lock().unwrap().deref_mut().send_packet(self.protocol, packet_id as u8, &data)); try!(session.lock().unwrap().deref_mut().send_packet(self.io, self.protocol, packet_id as u8, &data));
try!(self.io.update_registration(peer));
} else { } else {
trace!(target: "network", "Send: Peer no longer exist") trace!(target: "network", "Send: Peer no longer exist")
} }
@ -494,7 +493,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
for e in self.sessions.write().unwrap().iter_mut() { for e in self.sessions.write().unwrap().iter_mut() {
let mut s = e.lock().unwrap(); let mut s = e.lock().unwrap();
if !s.keep_alive(io) { if !s.keep_alive(io) {
s.disconnect(DisconnectReason::PingTimeout); s.disconnect(io, DisconnectReason::PingTimeout);
to_kill.push(s.token()); to_kill.push(s.token());
} }
} }
@ -616,9 +615,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
trace!(target: "network", "Session write error: {}: {:?}", token, e); trace!(target: "network", "Session write error: {}: {:?}", token, e);
} }
if s.done() { if s.done() {
io.deregister_stream(token).expect("Error deregistering stream"); io.deregister_stream(token).unwrap_or_else(|e| debug!("Error deregistering stream: {:?}", e));
} else {
io.update_registration(token).unwrap_or_else(|e| debug!(target: "network", "Session registration error: {:?}", e));
} }
} }
} }
@ -630,11 +627,12 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
fn session_readable(&self, token: StreamToken, io: &IoContext<NetworkIoMessage<Message>>) { fn session_readable(&self, token: StreamToken, io: &IoContext<NetworkIoMessage<Message>>) {
let mut ready_data: Vec<ProtocolId> = Vec::new(); let mut ready_data: Vec<ProtocolId> = Vec::new();
let mut packet_data: Option<(ProtocolId, PacketId, Vec<u8>)> = None; let mut packet_data: Vec<(ProtocolId, PacketId, Vec<u8>)> = Vec::new();
let mut kill = false; let mut kill = false;
let session = { self.sessions.read().unwrap().get(token).cloned() }; let session = { self.sessions.read().unwrap().get(token).cloned() };
if let Some(session) = session.clone() { if let Some(session) = session.clone() {
let mut s = session.lock().unwrap(); let mut s = session.lock().unwrap();
loop {
match s.readable(io, &self.info.read().unwrap()) { match s.readable(io, &self.info.read().unwrap()) {
Err(e) => { Err(e) => {
trace!(target: "network", "Session read error: {}:{:?} ({:?}) {:?}", token, s.id(), s.remote_addr(), e); trace!(target: "network", "Session read error: {}:{:?} ({:?}) {:?}", token, s.id(), s.remote_addr(), e);
@ -648,13 +646,14 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
_ => (), _ => (),
} }
kill = true; kill = true;
break;
}, },
Ok(SessionData::Ready) => { Ok(SessionData::Ready) => {
if !s.info.originated { if !s.info.originated {
let session_count = self.session_count(); let session_count = self.session_count();
let ideal_peers = { self.info.read().unwrap().deref().config.ideal_peers }; let ideal_peers = { self.info.read().unwrap().deref().config.ideal_peers };
if session_count >= ideal_peers as usize { if session_count >= ideal_peers as usize {
s.disconnect(DisconnectReason::TooManyPeers); s.disconnect(io, DisconnectReason::TooManyPeers);
return; return;
} }
// Add it no node table // Add it no node table
@ -681,10 +680,11 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}) => { }) => {
match self.handlers.read().unwrap().get(protocol) { match self.handlers.read().unwrap().get(protocol) {
None => { warn!(target: "network", "No handler found for protocol: {:?}", protocol) }, None => { warn!(target: "network", "No handler found for protocol: {:?}", protocol) },
Some(_) => packet_data = Some((protocol, packet_id, data)), Some(_) => packet_data.push((protocol, packet_id, data)),
} }
}, },
Ok(SessionData::None) => {}, Ok(SessionData::None) => break,
}
} }
} }
if kill { if kill {
@ -695,11 +695,10 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
self.stats.inc_sessions(); self.stats.inc_sessions();
h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone()), &token); h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone()), &token);
} }
if let Some((p, packet_id, data)) = packet_data { for (p, packet_id, data) in packet_data {
let h = self.handlers.read().unwrap().get(p).unwrap().clone(); let h = self.handlers.read().unwrap().get(p).unwrap().clone();
h.read(&NetworkContext::new(io, p, session.clone(), self.sessions.clone()), &token, packet_id, &data[1..]); h.read(&NetworkContext::new(io, p, session.clone(), self.sessions.clone()), &token, packet_id, &data[1..]);
} }
io.update_registration(token).unwrap_or_else(|e| debug!(target: "network", "Token registration error: {:?}", e));
} }
fn connection_timeout(&self, token: StreamToken, io: &IoContext<NetworkIoMessage<Message>>) { fn connection_timeout(&self, token: StreamToken, io: &IoContext<NetworkIoMessage<Message>>) {
@ -742,9 +741,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
h.disconnected(&NetworkContext::new(io, p, expired_session.clone(), self.sessions.clone()), &token); h.disconnected(&NetworkContext::new(io, p, expired_session.clone(), self.sessions.clone()), &token);
} }
if deregister { if deregister {
io.deregister_stream(token).expect("Error deregistering stream"); io.deregister_stream(token).unwrap_or_else(|e| debug!("Error deregistering stream: {:?}", e));
} else if expired_session.is_some() {
io.update_registration(token).unwrap_or_else(|e| debug!(target: "network", "Connection registration error: {:?}", e));
} }
} }
@ -874,7 +871,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
NetworkIoMessage::Disconnect(ref peer) => { NetworkIoMessage::Disconnect(ref peer) => {
let session = { self.sessions.read().unwrap().get(*peer).cloned() }; let session = { self.sessions.read().unwrap().get(*peer).cloned() };
if let Some(session) = session { if let Some(session) = session {
session.lock().unwrap().disconnect(DisconnectReason::DisconnectRequested); session.lock().unwrap().disconnect(io, DisconnectReason::DisconnectRequested);
} }
trace!(target: "network", "Disconnect requested {}", peer); trace!(target: "network", "Disconnect requested {}", peer);
self.kill_connection(*peer, io, false); self.kill_connection(*peer, io, false);
@ -882,7 +879,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
NetworkIoMessage::DisablePeer(ref peer) => { NetworkIoMessage::DisablePeer(ref peer) => {
let session = { self.sessions.read().unwrap().get(*peer).cloned() }; let session = { self.sessions.read().unwrap().get(*peer).cloned() };
if let Some(session) = session { if let Some(session) = session {
session.lock().unwrap().disconnect(DisconnectReason::DisconnectRequested); session.lock().unwrap().disconnect(io, DisconnectReason::DisconnectRequested);
if let Some(id) = session.lock().unwrap().id() { if let Some(id) = session.lock().unwrap().id() {
self.nodes.write().unwrap().mark_as_useless(id) self.nodes.write().unwrap().mark_as_useless(id)
} }

View File

@ -145,7 +145,7 @@ impl Session {
}) })
} }
fn complete_handshake(&mut self, host: &HostInfo) -> Result<(), UtilError> { fn complete_handshake<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo) -> Result<(), UtilError> where Message: Send + Sync + Clone {
let connection = if let State::Handshake(ref mut h) = self.state { let connection = if let State::Handshake(ref mut h) = self.state {
self.info.id = Some(h.id.clone()); self.info.id = Some(h.id.clone());
try!(EncryptedConnection::new(h)) try!(EncryptedConnection::new(h))
@ -153,8 +153,8 @@ impl Session {
panic!("Unexpected state"); panic!("Unexpected state");
}; };
self.state = State::Session(connection); self.state = State::Session(connection);
try!(self.write_hello(host)); try!(self.write_hello(io, host));
try!(self.send_ping()); try!(self.send_ping(io));
Ok(()) Ok(())
} }
@ -220,10 +220,11 @@ impl Session {
} }
} }
if let Some(data) = packet_data { if let Some(data) = packet_data {
return Ok(try!(self.read_packet(data, host))); return Ok(try!(self.read_packet(io, data, host)));
} }
if create_session { if create_session {
try!(self.complete_handshake(host)); try!(self.complete_handshake(io, host));
io.update_registration(self.token()).unwrap_or_else(|e| debug!(target: "network", "Token registration error: {:?}", e));
} }
Ok(SessionData::None) Ok(SessionData::None)
} }
@ -263,7 +264,8 @@ impl Session {
} }
/// Send a protocol packet to peer. /// Send a protocol packet to peer.
pub fn send_packet(&mut self, protocol: &str, packet_id: u8, data: &[u8]) -> Result<(), UtilError> { pub fn send_packet<Message>(&mut self, io: &IoContext<Message>, protocol: &str, packet_id: u8, data: &[u8]) -> Result<(), UtilError>
where Message: Send + Sync + Clone {
if self.info.capabilities.is_empty() || !self.had_hello { if self.info.capabilities.is_empty() || !self.had_hello {
debug!(target: "network", "Sending to unconfirmed session {}, protocol: {}, packet: {}", self.token(), protocol, packet_id); debug!(target: "network", "Sending to unconfirmed session {}, protocol: {}, packet: {}", self.token(), protocol, packet_id);
return Err(From::from(NetworkError::BadProtocol)); return Err(From::from(NetworkError::BadProtocol));
@ -283,7 +285,7 @@ impl Session {
let mut rlp = RlpStream::new(); let mut rlp = RlpStream::new();
rlp.append(&(pid as u32)); rlp.append(&(pid as u32));
rlp.append_raw(data, 1); rlp.append_raw(data, 1);
self.send(rlp) self.send(io, rlp)
} }
/// Keep this session alive. Returns false if ping timeout happened /// Keep this session alive. Returns false if ping timeout happened
@ -298,10 +300,9 @@ impl Session {
}; };
if !timed_out && time::precise_time_ns() - self.ping_time_ns > PING_INTERVAL_SEC * 1000_000_000 { if !timed_out && time::precise_time_ns() - self.ping_time_ns > PING_INTERVAL_SEC * 1000_000_000 {
if let Err(e) = self.send_ping() { if let Err(e) = self.send_ping(io) {
debug!("Error sending ping message: {:?}", e); debug!("Error sending ping message: {:?}", e);
} }
io.update_registration(self.token()).unwrap_or_else(|e| debug!(target: "network", "Session registration error: {:?}", e));
} }
!timed_out !timed_out
} }
@ -310,7 +311,8 @@ impl Session {
self.connection().token() self.connection().token()
} }
fn read_packet(&mut self, packet: Packet, host: &HostInfo) -> Result<SessionData, UtilError> { fn read_packet<Message>(&mut self, io: &IoContext<Message>, packet: Packet, host: &HostInfo) -> Result<SessionData, UtilError>
where Message: Send + Sync + Clone {
if packet.data.len() < 2 { if packet.data.len() < 2 {
return Err(From::from(NetworkError::BadProtocol)); return Err(From::from(NetworkError::BadProtocol));
} }
@ -321,7 +323,7 @@ impl Session {
match packet_id { match packet_id {
PACKET_HELLO => { PACKET_HELLO => {
let rlp = UntrustedRlp::new(&packet.data[1..]); //TODO: validate rlp expected size let rlp = UntrustedRlp::new(&packet.data[1..]); //TODO: validate rlp expected size
try!(self.read_hello(&rlp, host)); try!(self.read_hello(io, &rlp, host));
Ok(SessionData::Ready) Ok(SessionData::Ready)
}, },
PACKET_DISCONNECT => { PACKET_DISCONNECT => {
@ -330,7 +332,7 @@ impl Session {
Err(From::from(NetworkError::Disconnect(DisconnectReason::from_u8(reason)))) Err(From::from(NetworkError::Disconnect(DisconnectReason::from_u8(reason))))
} }
PACKET_PING => { PACKET_PING => {
try!(self.send_pong()); try!(self.send_pong(io));
Ok(SessionData::None) Ok(SessionData::None)
}, },
PACKET_PONG => { PACKET_PONG => {
@ -362,7 +364,7 @@ impl Session {
} }
} }
fn write_hello(&mut self, host: &HostInfo) -> Result<(), UtilError> { fn write_hello<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo) -> Result<(), UtilError> where Message: Send + Sync + Clone {
let mut rlp = RlpStream::new(); let mut rlp = RlpStream::new();
rlp.append_raw(&[PACKET_HELLO as u8], 0); rlp.append_raw(&[PACKET_HELLO as u8], 0);
rlp.begin_list(5) rlp.begin_list(5)
@ -371,10 +373,11 @@ impl Session {
.append(&host.capabilities) .append(&host.capabilities)
.append(&host.local_endpoint.address.port()) .append(&host.local_endpoint.address.port())
.append(host.id()); .append(host.id());
self.send(rlp) self.send(io, rlp)
} }
fn read_hello(&mut self, rlp: &UntrustedRlp, host: &HostInfo) -> Result<(), UtilError> { fn read_hello<Message>(&mut self, io: &IoContext<Message>, rlp: &UntrustedRlp, host: &HostInfo) -> Result<(), UtilError>
where Message: Send + Sync + Clone {
let protocol = try!(rlp.val_at::<u32>(0)); let protocol = try!(rlp.val_at::<u32>(0));
let client_version = try!(rlp.val_at::<String>(1)); let client_version = try!(rlp.val_at::<String>(1));
let peer_caps = try!(rlp.val_at::<Vec<PeerCapabilityInfo>>(2)); let peer_caps = try!(rlp.val_at::<Vec<PeerCapabilityInfo>>(2));
@ -417,36 +420,36 @@ impl Session {
self.info.capabilities = caps; self.info.capabilities = caps;
if self.info.capabilities.is_empty() { if self.info.capabilities.is_empty() {
trace!(target: "network", "No common capabilities with peer."); trace!(target: "network", "No common capabilities with peer.");
return Err(From::from(self.disconnect(DisconnectReason::UselessPeer))); return Err(From::from(self.disconnect(io, DisconnectReason::UselessPeer)));
} }
if protocol != host.protocol_version { if protocol != host.protocol_version {
trace!(target: "network", "Peer protocol version mismatch: {}", protocol); trace!(target: "network", "Peer protocol version mismatch: {}", protocol);
return Err(From::from(self.disconnect(DisconnectReason::UselessPeer))); return Err(From::from(self.disconnect(io, DisconnectReason::UselessPeer)));
} }
self.had_hello = true; self.had_hello = true;
Ok(()) Ok(())
} }
/// Senf ping packet /// Senf ping packet
pub fn send_ping(&mut self) -> Result<(), UtilError> { pub fn send_ping<Message>(&mut self, io: &IoContext<Message>) -> Result<(), UtilError> where Message: Send + Sync + Clone {
try!(self.send(try!(Session::prepare(PACKET_PING)))); try!(self.send(io, try!(Session::prepare(PACKET_PING))));
self.ping_time_ns = time::precise_time_ns(); self.ping_time_ns = time::precise_time_ns();
self.pong_time_ns = None; self.pong_time_ns = None;
Ok(()) Ok(())
} }
fn send_pong(&mut self) -> Result<(), UtilError> { fn send_pong<Message>(&mut self, io: &IoContext<Message>) -> Result<(), UtilError> where Message: Send + Sync + Clone {
self.send(try!(Session::prepare(PACKET_PONG))) self.send(io, try!(Session::prepare(PACKET_PONG)))
} }
/// Disconnect this session /// Disconnect this session
pub fn disconnect(&mut self, reason: DisconnectReason) -> NetworkError { pub fn disconnect<Message>(&mut self, io: &IoContext<Message>, reason: DisconnectReason) -> NetworkError where Message: Send + Sync + Clone {
if let State::Session(_) = self.state { if let State::Session(_) = self.state {
let mut rlp = RlpStream::new(); let mut rlp = RlpStream::new();
rlp.append(&(PACKET_DISCONNECT as u32)); rlp.append(&(PACKET_DISCONNECT as u32));
rlp.begin_list(1); rlp.begin_list(1);
rlp.append(&(reason as u32)); rlp.append(&(reason as u32));
self.send(rlp).ok(); self.send(io, rlp).ok();
} }
NetworkError::Disconnect(reason) NetworkError::Disconnect(reason)
} }
@ -458,13 +461,13 @@ impl Session {
Ok(rlp) Ok(rlp)
} }
fn send(&mut self, rlp: RlpStream) -> Result<(), UtilError> { fn send<Message>(&mut self, io: &IoContext<Message>, rlp: RlpStream) -> Result<(), UtilError> where Message: Send + Sync + Clone {
match self.state { match self.state {
State::Handshake(_) => { State::Handshake(_) => {
warn!(target:"network", "Unexpected send request"); warn!(target:"network", "Unexpected send request");
}, },
State::Session(ref mut s) => { State::Session(ref mut s) => {
try!(s.send_packet(&rlp.out())) try!(s.send_packet(io, &rlp.out()))
}, },
} }
Ok(()) Ok(())