From 6b12334136ad416f628e8c3ed4a945451177dbb3 Mon Sep 17 00:00:00 2001 From: Arkadiy Paronyan Date: Mon, 13 Jun 2016 18:55:24 +0200 Subject: [PATCH] Windows build (#1253) * Networking refactoring * Fixed typo * Trace logging * Updated dependencies for windows build * Windows fixes * use mio 0.5 * nix build * Windows build fix * style * removed unused import * ipc crate version bump * ipc config for named pipes * tweaks and fixes * tweaks and fixes * final version bump * Fixed tests * Disable color output on windows * Added missing doc --- Cargo.lock | 30 ++++++-- Cargo.toml | 4 +- parity/configuration.rs | 24 +++++- parity/main.rs | 33 +++++--- parity/rpc.rs | 26 +++++-- util/Cargo.toml | 2 +- util/src/io/service.rs | 2 +- util/src/keys/directory.rs | 8 ++ util/src/network/connection.rs | 134 ++++++++++++++++++++------------- util/src/network/handshake.rs | 54 +++++++------ util/src/network/host.rs | 121 +++++++++++++++-------------- util/src/network/session.rs | 53 +++++++------ 12 files changed, 293 insertions(+), 198 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 52e053d66..c850940c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -20,7 +20,7 @@ dependencies = [ "ethsync 1.2.0", "fdlimit 0.1.0", "hyper 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", - "json-ipc-server 0.1.0 (git+https://github.com/ethcore/json-ipc-server.git)", + "json-ipc-server 0.2.2 (git+https://github.com/ethcore/json-ipc-server.git)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", "number_prefix 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", @@ -346,7 +346,7 @@ dependencies = [ "ethcore-util 1.2.0", "ethjson 0.1.0", "ethsync 1.2.0", - "json-ipc-server 0.1.0 (git+https://github.com/ethcore/json-ipc-server.git)", + "json-ipc-server 0.2.2 (git+https://github.com/ethcore/json-ipc-server.git)", "jsonrpc-core 2.0.5 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-http-server 5.1.0 (git+https://github.com/ethcore/jsonrpc-http-server.git)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -394,7 +394,7 @@ dependencies = [ "lazy_static 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", - "mio 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.5.1 (git+https://github.com/ethcore/mio?branch=v0.5.x)", "nix 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", "rocksdb 0.4.5 (git+https://github.com/ethcore/rust-rocksdb)", @@ -570,12 +570,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "json-ipc-server" -version = "0.1.0" -source = "git+https://github.com/ethcore/json-ipc-server.git#4f9226c4f84dcce2385a188374e3b5fc66b63e68" +version = "0.2.2" +source = "git+https://github.com/ethcore/json-ipc-server.git#15ef25e5f859d2d27469c92cc13dd1ddea03e444" dependencies = [ "bytes 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "env_logger 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 2.0.5 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "mio 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", + "miow 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "slab 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -704,6 +708,22 @@ dependencies = [ "winapi 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "mio" +version = "0.5.1" +source = "git+https://github.com/ethcore/mio?branch=v0.5.x#1fc881771fb8c2517317b4f805d7b88235be422b" +dependencies = [ + "bytes 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", + "miow 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "net2 0.2.23 (registry+https://github.com/rust-lang/crates.io-index)", + "nix 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "slab 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "mio" version = "0.5.1" diff --git a/Cargo.toml b/Cargo.toml index f7f805d30..ff624fa1f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,6 @@ docopt = "0.6" time = "0.1" ctrlc = { git = "https://github.com/ethcore/rust-ctrlc.git" } fdlimit = { path = "util/fdlimit" } -daemonize = "0.2" num_cpus = "0.2" number_prefix = "0.2" rpassword = "0.2.1" @@ -37,6 +36,9 @@ ethcore-ipc = { path = "ipc/rpc" } json-ipc-server = { git = "https://github.com/ethcore/json-ipc-server.git" } ansi_term = "0.7" +[target.'cfg(not(windows))'.dependencies] +daemonize = "0.2" + [dependencies.hyper] version = "0.8" default-features = false diff --git a/parity/configuration.rs b/parity/configuration.rs index 3076e4e68..1e14cd1de 100644 --- a/parity/configuration.rs +++ b/parity/configuration.rs @@ -287,9 +287,14 @@ impl Configuration { } fn geth_ipc_path(&self) -> String { - if self.args.flag_testnet { path::ethereum::with_testnet("geth.ipc") } - else { path::ethereum::with_default("geth.ipc") } - .to_str().unwrap().to_owned() + if cfg!(windows) { + r"\\.\pipe\geth.ipc".to_owned() + } + else { + if self.args.flag_testnet { path::ethereum::with_testnet("geth.ipc") } + else { path::ethereum::with_default("geth.ipc") } + .to_str().unwrap().to_owned() + } } pub fn keys_iterations(&self) -> u32 { @@ -358,7 +363,18 @@ impl Configuration { fn ipc_path(&self) -> String { if self.args.flag_geth { self.geth_ipc_path() } - else { Configuration::replace_home(&self.args.flag_ipcpath.clone().unwrap_or(self.args.flag_ipc_path.clone())) } + else { + if cfg!(windows) { + r"\\.\pipe\parity.jsonrpc".to_owned() + } + else { + Configuration::replace_home(&self.args.flag_ipcpath.clone().unwrap_or(self.args.flag_ipc_path.clone())) + } + } + } + + pub fn have_color(&self) -> bool { + !self.args.flag_no_color && !cfg!(windows) } pub fn signer_port(&self) -> Option { diff --git a/parity/main.rs b/parity/main.rs index 0d536e28a..679bc012a 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -32,6 +32,7 @@ extern crate log as rlog; extern crate env_logger; extern crate ctrlc; extern crate fdlimit; +#[cfg(not(windows))] extern crate daemonize; extern crate time; extern crate number_prefix; @@ -86,7 +87,6 @@ use ethcore::service::ClientService; use ethcore::spec::Spec; use ethsync::EthSync; use ethcore::miner::{Miner, MinerService, ExternalMiner}; -use daemonize::Daemonize; use migration::migrate; use informant::Informant; @@ -115,11 +115,7 @@ fn execute(conf: Configuration) { execute_upgrades(&conf, &spec, &client_config); if conf.args.cmd_daemon { - Daemonize::new() - .pid_file(conf.args.arg_pid_file.clone()) - .chown_pid_file(true) - .start() - .unwrap_or_else(|e| die!("Couldn't daemonize; {}", e)); + daemonize(&conf); } if conf.args.cmd_account { @@ -145,6 +141,20 @@ fn execute(conf: Configuration) { execute_client(conf, spec, client_config); } +#[cfg(not(windows))] +fn daemonize(conf: &Configuration) { + use daemonize::Daemonize; + Daemonize::new() + .pid_file(conf.args.arg_pid_file.clone()) + .chown_pid_file(true) + .start() + .unwrap_or_else(|e| die!("Couldn't daemonize; {}", e)); +} + +#[cfg(windows)] +fn daemonize(_conf: &Configuration) { +} + fn execute_upgrades(conf: &Configuration, spec: &Spec, client_config: &ClientConfig) { match ::upgrade::upgrade(Some(&conf.path())) { Ok(upgrades_applied) if upgrades_applied > 0 => { @@ -228,6 +238,7 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig) // setup ipc rpc let _ipc_server = rpc::new_ipc(conf.ipc_settings(), &dependencies); + debug!("IPC: {}", conf.ipc_settings()); if conf.args.flag_webapp { println!("WARNING: Flag -w/--webapp is deprecated. Dapps server is now on by default. Ignoring."); } let dapps_server = dapps::new(dapps::Configuration { @@ -255,7 +266,7 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig) // Register IO handler let io_handler = Arc::new(ClientIoHandler { client: service.client(), - info: Informant::new(!conf.args.flag_no_color), + info: Informant::new(conf.have_color()), sync: sync.clone(), accounts: account_service.clone(), }); @@ -375,8 +386,8 @@ fn execute_import(conf: Configuration) { panic_handler.forward_from(&service); let client = service.client(); - let mut instream: Box = if let Some(f) = conf.args.arg_file { - let f = File::open(&f).unwrap_or_else(|_| die!("Cannot open the file given: {}", f)); + let mut instream: Box = if let Some(ref f) = conf.args.arg_file { + let f = File::open(f).unwrap_or_else(|_| die!("Cannot open the file given: {}", f)); Box::new(f) } else { Box::new(::std::io::stdin()) @@ -386,7 +397,7 @@ fn execute_import(conf: Configuration) { let mut first_read = 0; let format = match conf.args.flag_format { - Some(x) => match x.deref() { + Some(ref x) => match x.deref() { "binary" | "bin" => DataFormat::Binary, "hex" => DataFormat::Hex, x => die!("Invalid --format parameter given: {:?}", x), @@ -407,7 +418,7 @@ fn execute_import(conf: Configuration) { } }; - let informant = Informant::new(!conf.args.flag_no_color); + let informant = Informant::new(conf.have_color()); let do_import = |bytes| { while client.queue_info().is_full() { sleep(Duration::from_secs(1)); } diff --git a/parity/rpc.rs b/parity/rpc.rs index 66f504408..c10635889 100644 --- a/parity/rpc.rs +++ b/parity/rpc.rs @@ -22,6 +22,7 @@ use util::panics::PanicHandler; use die::*; use jsonipc; use rpc_apis; +use std::fmt; #[cfg(feature = "rpc")] pub use ethcore_rpc::Server as RpcServer; @@ -44,6 +45,17 @@ pub struct IpcConfiguration { pub apis: String, } +impl fmt::Display for IpcConfiguration { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + if self.enabled { + write!(f, "endpoint address [{}], api list [{}]", self.socket_addr, self.apis) + } + else { + write!(f, "disabled") + } + } +} + pub struct Dependencies { pub panic_handler: Arc, pub apis: Arc, @@ -66,12 +78,6 @@ pub fn new_http(conf: HttpConfiguration, deps: &Dependencies) -> Option Option { - if !conf.enabled { return None; } - let apis = conf.apis.split(',').collect(); - Some(setup_ipc_rpc_server(deps, &conf.socket_addr, apis)) -} - fn setup_rpc_server(apis: Vec<&str>, deps: &Dependencies) -> Server { let apis = rpc_apis::from_str(apis); let server = Server::new(); @@ -109,10 +115,18 @@ pub fn setup_http_rpc_server( }, } } + #[cfg(not(feature = "rpc"))] pub fn setup_ipc_rpc_server(_dependencies: &Dependencies, _addr: &str, _apis: Vec<&str>) -> ! { die!("Your Parity version has been compiled without JSON-RPC support.") } + +pub fn new_ipc(conf: IpcConfiguration, deps: &Dependencies) -> Option { + if !conf.enabled { return None; } + let apis = conf.apis.split(',').collect(); + Some(setup_ipc_rpc_server(deps, &conf.socket_addr, apis)) +} + #[cfg(feature = "rpc")] pub fn setup_ipc_rpc_server(dependencies: &Dependencies, addr: &str, apis: Vec<&str>) -> jsonipc::Server { let server = setup_rpc_server(apis, dependencies); diff --git a/util/Cargo.toml b/util/Cargo.toml index 173acd528..05e05d4c1 100644 --- a/util/Cargo.toml +++ b/util/Cargo.toml @@ -12,7 +12,7 @@ log = "0.3" env_logger = "0.3" rustc-serialize = "0.3" arrayvec = "0.3" -mio = "0.5.1" +mio = { git = "https://github.com/ethcore/mio", branch = "v0.5.x" } nix ="0.5.0" rand = "0.3.12" time = "0.1.34" diff --git a/util/src/io/service.rs b/util/src/io/service.rs index d946463f5..409667c46 100644 --- a/util/src/io/service.rs +++ b/util/src/io/service.rs @@ -335,7 +335,7 @@ impl IoService where Message: Send + Sync + Clone + 'static { /// Starts IO event loop pub fn start() -> Result, UtilError> { let panic_handler = PanicHandler::new_in_arc(); - let mut event_loop = EventLoop::new().unwrap(); + let mut event_loop = EventLoop::new().unwrap(); let channel = event_loop.channel(); let panic = panic_handler.clone(); let thread = thread::spawn(move || { diff --git a/util/src/keys/directory.rs b/util/src/keys/directory.rs index 20be7df7b..d9d453409 100644 --- a/util/src/keys/directory.rs +++ b/util/src/keys/directory.rs @@ -466,6 +466,7 @@ pub struct KeyDirectory { } /// Restricts the permissions of given path only to the owner. +#[cfg(not(windows))] pub fn restrict_permissions_owner(file_path: &Path) -> Result<(), i32> { let cstr = ::std::ffi::CString::new(file_path.to_str().unwrap()).unwrap(); match unsafe { ::libc::chmod(cstr.as_ptr(), ::libc::S_IWUSR | ::libc::S_IRUSR) } { @@ -474,6 +475,13 @@ pub fn restrict_permissions_owner(file_path: &Path) -> Result<(), i32> { } } +/// Restricts the permissions of given path only to the owner. +#[cfg(windows)] +pub fn restrict_permissions_owner(_file_path: &Path) -> Result<(), i32> { + //TODO: implement me + Ok(()) +} + impl KeyDirectory { /// Initializes new cache directory context with a given `path` pub fn new(path: &Path) -> KeyDirectory { diff --git a/util/src/network/connection.rs b/util/src/network/connection.rs index 3f20b8f7b..ade06b469 100644 --- a/util/src/network/connection.rs +++ b/util/src/network/connection.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use std::collections::VecDeque; use std::net::SocketAddr; +use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; use mio::{Handler, Token, EventSet, EventLoop, PollOpt, TryRead, TryWrite}; use mio::tcp::*; use hash::*; @@ -60,45 +61,57 @@ pub struct GenericConnection { interest: EventSet, /// Shared network statistics stats: Arc, + /// Registered flag + registered: AtomicBool, } impl GenericConnection { pub fn expect(&mut self, size: usize) { + trace!(target:"network", "Expect to read {} bytes", size); if self.rec_size != self.rec_buf.len() { - warn!(target:"net", "Unexpected connection read start"); + warn!(target:"network", "Unexpected connection read start"); } - unsafe { self.rec_buf.set_len(0) } self.rec_size = size; } /// Readable IO handler. Called when there is some data to be read. pub fn readable(&mut self) -> io::Result> { if self.rec_size == 0 || self.rec_buf.len() >= self.rec_size { - warn!(target:"net", "Unexpected connection read"); + warn!(target:"network", "Unexpected connection read"); } - let max = self.rec_size - self.rec_buf.len(); - // resolve "multiple applicable items in scope [E0034]" error let sock_ref = ::by_ref(&mut self.socket); - match sock_ref.take(max as u64).try_read_buf(&mut self.rec_buf) { - Ok(Some(size)) if size != 0 => { - self.stats.inc_recv(size); - if self.rec_size != 0 && self.rec_buf.len() == self.rec_size { - self.rec_size = 0; - Ok(Some(::std::mem::replace(&mut self.rec_buf, Bytes::new()))) - } else { Ok(None) } - }, - Ok(_) => Ok(None), - Err(e) => Err(e), - } - } + loop { + let max = self.rec_size - self.rec_buf.len(); + match sock_ref.take(max as u64).try_read_buf(&mut self.rec_buf) { + Ok(Some(size)) if size != 0 => { + self.stats.inc_recv(size); + trace!(target:"network", "{}: Read {} of {} bytes", self.token, self.rec_buf.len(), self.rec_size); + if self.rec_size != 0 && self.rec_buf.len() == self.rec_size { + self.rec_size = 0; + return Ok(Some(::std::mem::replace(&mut self.rec_buf, Bytes::new()))) + } + else if self.rec_buf.len() > self.rec_size { + warn!(target:"network", "Read past buffer {} bytes", self.rec_buf.len() - self.rec_size); + return Ok(Some(::std::mem::replace(&mut self.rec_buf, Bytes::new()))) + } + }, + Ok(_) => return Ok(None), + Err(e) => { + debug!(target:"network", "Read error {} ({})", self.token, e); + return Err(e) + } + } + } + } /// Add a packet to send queue. - pub fn send(&mut self, data: Bytes) { + pub fn send(&mut self, io: &IoContext, data: Bytes) where Message: Send + Clone { if !data.is_empty() { self.send_queue.push_back(Cursor::new(data)); } if !self.interest.is_writable() { self.interest.insert(EventSet::writable()); + io.update_registration(self.token).ok(); } } @@ -108,7 +121,7 @@ impl GenericConnection { } /// Writable IO handler. Called when the socket is ready to send. - pub fn writable(&mut self) -> io::Result { + pub fn writable(&mut self, io: &IoContext) -> Result where Message: Send + Clone { if self.send_queue.is_empty() { return Ok(WriteStatus::Complete) } @@ -121,7 +134,6 @@ impl GenericConnection { } match self.socket.try_write_buf(buf) { Ok(Some(size)) if (buf.position() as usize) < send_size => { - self.interest.insert(EventSet::writable()); self.stats.inc_send(size); Ok(WriteStatus::Ongoing) }, @@ -131,7 +143,7 @@ impl GenericConnection { }, Ok(Some(_)) => { panic!("Wrote past buffer");}, Ok(None) => Ok(WriteStatus::Ongoing), - Err(e) => Err(e) + Err(e) => try!(Err(e)) } }.and_then(|r| { if r == WriteStatus::Complete { @@ -139,9 +151,7 @@ impl GenericConnection { } if self.send_queue.is_empty() { self.interest.remove(EventSet::writable()); - } - else { - self.interest.insert(EventSet::writable()); + try!(io.update_registration(self.token)); } Ok(r) }) @@ -162,6 +172,7 @@ impl Connection { rec_size: 0, interest: EventSet::hup() | EventSet::readable(), stats: stats, + registered: AtomicBool::new(false), } } @@ -188,27 +199,36 @@ impl Connection { rec_buf: Vec::new(), rec_size: 0, send_queue: self.send_queue.clone(), - interest: EventSet::hup() | EventSet::readable(), + interest: EventSet::hup(), stats: self.stats.clone(), + registered: AtomicBool::new(false), }) } /// Register this connection with the IO event loop. pub fn register_socket(&self, reg: Token, event_loop: &mut EventLoop) -> io::Result<()> { + if self.registered.load(AtomicOrdering::SeqCst) { + return Ok(()); + } trace!(target: "network", "connection register; token={:?}", reg); if let Err(e) = event_loop.register(&self.socket, reg, self.interest, PollOpt::edge() /* | PollOpt::oneshot() */) { // TODO: oneshot is broken on windows trace!(target: "network", "Failed to register {:?}, {:?}", reg, e); } + self.registered.store(true, AtomicOrdering::SeqCst); Ok(()) } /// Update connection registration. Should be called at the end of the IO handler. pub fn update_socket(&self, reg: Token, event_loop: &mut EventLoop) -> io::Result<()> { trace!(target: "network", "connection reregister; token={:?}", reg); - event_loop.reregister( &self.socket, reg, self.interest, PollOpt::edge() /* | PollOpt::oneshot() */ ).or_else(|e| { // TODO: oneshot is broken on windows - trace!(target: "network", "Failed to reregister {:?}, {:?}", reg, e); + if !self.registered.load(AtomicOrdering::SeqCst) { + self.register_socket(reg, event_loop) + } else { + event_loop.reregister(&self.socket, reg, self.interest, PollOpt::edge() /* | PollOpt::oneshot() */ ).unwrap_or_else(|e| { // TODO: oneshot is broken on windows + trace!(target: "network", "Failed to reregister {:?}, {:?}", reg, e); + }); Ok(()) - }) + } } /// Delete connection registration. Should be called at the end of the IO handler. @@ -266,7 +286,7 @@ pub struct EncryptedConnection { } impl EncryptedConnection { - /// Create an encrypted connection out of the handshake. Consumes a handshake object. + /// Create an encrypted connection out of the handshake. pub fn new(handshake: &mut Handshake) -> Result { let shared = try!(crypto::ecdh::agree(handshake.ecdhe.secret(), &handshake.remote_ephemeral)); let mut nonce_material = H512::new(); @@ -320,7 +340,7 @@ impl EncryptedConnection { } /// Send a packet - pub fn send_packet(&mut self, payload: &[u8]) -> Result<(), UtilError> { + pub fn send_packet(&mut self, io: &IoContext, payload: &[u8]) -> Result<(), UtilError> where Message: Send + Clone { let mut header = RlpStream::new(); let len = payload.len() as usize; header.append_raw(&[(len >> 16) as u8, (len >> 8) as u8, len as u8], 1); @@ -342,7 +362,7 @@ impl EncryptedConnection { self.egress_mac.update(&packet[32..(32 + len + padding)]); EncryptedConnection::update_mac(&mut self.egress_mac, &mut self.mac_encoder, &[0u8; 0]); self.egress_mac.clone().finalize(&mut packet[(32 + len + padding)..]); - self.connection.send(packet); + self.connection.send(io, packet); Ok(()) } @@ -417,31 +437,29 @@ impl EncryptedConnection { /// Readable IO handler. Tracker receive status and returns decoded packet if avaialable. pub fn readable(&mut self, io: &IoContext) -> Result, UtilError> where Message: Send + Clone{ io.clear_timer(self.connection.token).unwrap(); - match self.read_state { - EncryptedConnectionState::Header => { - if let Some(data) = try!(self.connection.readable()) { - try!(self.read_header(&data)); - try!(io.register_timer(self.connection.token, RECIEVE_PAYLOAD_TIMEOUT)); - } - Ok(None) - }, - EncryptedConnectionState::Payload => { - match try!(self.connection.readable()) { - Some(data) => { - self.read_state = EncryptedConnectionState::Header; - self.connection.expect(ENCRYPTED_HEADER_LEN); - Ok(Some(try!(self.read_payload(&data)))) - }, - None => Ok(None) - } + if let EncryptedConnectionState::Header = self.read_state { + if let Some(data) = try!(self.connection.readable()) { + try!(self.read_header(&data)); + try!(io.register_timer(self.connection.token, RECIEVE_PAYLOAD_TIMEOUT)); } + }; + if let EncryptedConnectionState::Payload = self.read_state { + match try!(self.connection.readable()) { + Some(data) => { + self.read_state = EncryptedConnectionState::Header; + self.connection.expect(ENCRYPTED_HEADER_LEN); + Ok(Some(try!(self.read_payload(&data)))) + }, + None => Ok(None) + } + } else { + Ok(None) } } /// Writable IO handler. Processes send queeue. pub fn writable(&mut self, io: &IoContext) -> Result<(), UtilError> where Message: Send + Clone { - io.clear_timer(self.connection.token).unwrap(); - try!(self.connection.writable()); + try!(self.connection.writable(io)); Ok(()) } } @@ -472,12 +490,14 @@ pub fn test_encryption() { mod tests { use super::*; use std::sync::*; + use std::sync::atomic::AtomicBool; use super::super::stats::*; use std::io::{Read, Write, Error, Cursor, ErrorKind}; use mio::{EventSet}; use std::collections::VecDeque; use bytes::*; use devtools::*; + use io::*; impl GenericSocket for TestSocket {} @@ -521,6 +541,7 @@ mod tests { rec_size: 0, interest: EventSet::hup() | EventSet::readable(), stats: Arc::::new(NetworkStats::new()), + registered: AtomicBool::new(false), } } } @@ -543,10 +564,15 @@ mod tests { rec_size: 0, interest: EventSet::hup() | EventSet::readable(), stats: Arc::::new(NetworkStats::new()), + registered: AtomicBool::new(false), } } } + fn test_io() -> IoContext { + IoContext::new(IoChannel::disconnected(), 0) + } + #[test] fn connection_expect() { let mut connection = TestConnection::new(); @@ -557,7 +583,7 @@ mod tests { #[test] fn connection_write_empty() { let mut connection = TestConnection::new(); - let status = connection.writable(); + let status = connection.writable(&test_io()); assert!(status.is_ok()); assert!(WriteStatus::Complete == status.unwrap()); } @@ -568,7 +594,7 @@ mod tests { let data = Cursor::new(vec![0; 10240]); connection.send_queue.push_back(data); - let status = connection.writable(); + let status = connection.writable(&test_io()); assert!(status.is_ok()); assert!(WriteStatus::Complete == status.unwrap()); assert_eq!(10240, connection.socket.write_buffer.len()); @@ -581,7 +607,7 @@ mod tests { let data = Cursor::new(vec![0; 10240]); connection.send_queue.push_back(data); - let status = connection.writable(); + let status = connection.writable(&test_io()); assert!(status.is_ok()); assert!(WriteStatus::Ongoing == status.unwrap()); @@ -594,7 +620,7 @@ mod tests { let data = Cursor::new(vec![0; 10240]); connection.send_queue.push_back(data); - let status = connection.writable(); + let status = connection.writable(&test_io()); assert!(!status.is_ok()); assert_eq!(1, connection.send_queue.len()); diff --git a/util/src/network/handshake.rs b/util/src/network/handshake.rs index e02da3d4c..90e3bc67d 100644 --- a/util/src/network/handshake.rs +++ b/util/src/network/handshake.rs @@ -111,7 +111,7 @@ impl Handshake { self.originated = originated; io.register_timer(self.connection.token, HANDSHAKE_TIMEOUT).ok(); if originated { - try!(self.write_auth(host.secret(), host.id())); + try!(self.write_auth(io, host.secret(), host.id())); } else { self.state = HandshakeState::ReadingAuth; @@ -128,17 +128,17 @@ impl Handshake { /// Readable IO handler. Drives the state change. pub fn readable(&mut self, io: &IoContext, host: &HostInfo) -> Result<(), UtilError> where Message: Send + Clone { if !self.expired() { - io.clear_timer(self.connection.token).unwrap(); + io.clear_timer(self.connection.token).ok(); match self.state { HandshakeState::New => {} HandshakeState::ReadingAuth => { if let Some(data) = try!(self.connection.readable()) { - try!(self.read_auth(host.secret(), &data)); + try!(self.read_auth(io, host.secret(), &data)); }; }, HandshakeState::ReadingAuthEip8 => { if let Some(data) = try!(self.connection.readable()) { - try!(self.read_auth_eip8(host.secret(), &data)); + try!(self.read_auth_eip8(io, host.secret(), &data)); }; }, HandshakeState::ReadingAck => { @@ -153,9 +153,6 @@ impl Handshake { }, HandshakeState::StartSession => {}, } - if self.state != HandshakeState::StartSession { - try!(io.update_registration(self.connection.token)); - } } Ok(()) } @@ -163,11 +160,7 @@ impl Handshake { /// Writabe IO handler. pub fn writable(&mut self, io: &IoContext) -> Result<(), UtilError> where Message: Send + Clone { if !self.expired() { - io.clear_timer(self.connection.token).unwrap(); - try!(self.connection.writable()); - if self.state != HandshakeState::StartSession { - io.update_registration(self.connection.token).unwrap(); - } + try!(self.connection.writable(io)); } Ok(()) } @@ -183,7 +176,7 @@ impl Handshake { } /// Parse, validate and confirm auth message - fn read_auth(&mut self, secret: &Secret, data: &[u8]) -> Result<(), UtilError> { + fn read_auth(&mut self, io: &IoContext, secret: &Secret, data: &[u8]) -> Result<(), UtilError> where Message: Send + Clone { trace!(target:"network", "Received handshake auth from {:?}", self.connection.remote_addr_str()); if data.len() != V4_AUTH_PACKET_SIZE { debug!(target:"net", "Wrong auth packet size"); @@ -197,7 +190,7 @@ impl Handshake { let (pubk, rest) = rest.split_at(64); let (nonce, _) = rest.split_at(32); try!(self.set_auth(secret, sig, pubk, nonce, PROTOCOL_VERSION)); - try!(self.write_ack()); + try!(self.write_ack(io)); } Err(_) => { // Try to interpret as EIP-8 packet @@ -214,7 +207,7 @@ impl Handshake { Ok(()) } - fn read_auth_eip8(&mut self, secret: &Secret, data: &[u8]) -> Result<(), UtilError> { + fn read_auth_eip8(&mut self, io: &IoContext, secret: &Secret, data: &[u8]) -> Result<(), UtilError> where Message: Send + Clone { trace!(target:"network", "Received EIP8 handshake auth from {:?}", self.connection.remote_addr_str()); self.auth_cipher.extend_from_slice(data); let auth = try!(ecies::decrypt(secret, &self.auth_cipher[0..2], &self.auth_cipher[2..])); @@ -224,13 +217,13 @@ impl Handshake { let remote_nonce: H256 = try!(rlp.val_at(2)); let remote_version: u64 = try!(rlp.val_at(3)); try!(self.set_auth(secret, &signature, &remote_public, &remote_nonce, remote_version)); - try!(self.write_ack_eip8()); + try!(self.write_ack_eip8(io)); Ok(()) } /// Parse and validate ack message fn read_ack(&mut self, secret: &Secret, data: &[u8]) -> Result<(), UtilError> { - trace!(target:"network", "Received handshake auth to {:?}", self.connection.remote_addr_str()); + trace!(target:"network", "Received handshake ack from {:?}", self.connection.remote_addr_str()); if data.len() != V4_ACK_PACKET_SIZE { debug!(target:"net", "Wrong ack packet size"); return Err(From::from(NetworkError::BadProtocol)); @@ -270,7 +263,7 @@ impl Handshake { } /// Sends auth message - fn write_auth(&mut self, secret: &Secret, public: &Public) -> Result<(), UtilError> { + fn write_auth(&mut self, io: &IoContext, secret: &Secret, public: &Public) -> Result<(), UtilError> where Message: Send + Clone { trace!(target:"network", "Sending handshake auth to {:?}", self.connection.remote_addr_str()); let mut data = [0u8; /*Signature::SIZE*/ 65 + /*H256::SIZE*/ 32 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32 + 1]; //TODO: use associated constants let len = data.len(); @@ -290,14 +283,14 @@ impl Handshake { } let message = try!(crypto::ecies::encrypt(&self.id, &[], &data)); self.auth_cipher = message.clone(); - self.connection.send(message); + self.connection.send(io, message); self.connection.expect(V4_ACK_PACKET_SIZE); self.state = HandshakeState::ReadingAck; Ok(()) } /// Sends ack message - fn write_ack(&mut self) -> Result<(), UtilError> { + fn write_ack(&mut self, io: &IoContext) -> Result<(), UtilError> where Message: Send + Clone { trace!(target:"network", "Sending handshake ack to {:?}", self.connection.remote_addr_str()); let mut data = [0u8; 1 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32]; //TODO: use associated constants let len = data.len(); @@ -310,13 +303,13 @@ impl Handshake { } let message = try!(crypto::ecies::encrypt(&self.id, &[], &data)); self.ack_cipher = message.clone(); - self.connection.send(message); + self.connection.send(io, message); self.state = HandshakeState::StartSession; Ok(()) } /// Sends EIP8 ack message - fn write_ack_eip8(&mut self) -> Result<(), UtilError> { + fn write_ack_eip8(&mut self, io: &IoContext) -> Result<(), UtilError> where Message: Send + Clone { trace!(target:"network", "Sending EIP8 handshake ack to {:?}", self.connection.remote_addr_str()); let mut rlp = RlpStream::new_list(3); rlp.append(self.ecdhe.public()); @@ -333,7 +326,7 @@ impl Handshake { let message = try!(crypto::ecies::encrypt(&self.id, &prefix, &encoded)); self.ack_cipher.extend_from_slice(&prefix); self.ack_cipher.extend_from_slice(&message); - self.connection.send(self.ack_cipher.clone()); + self.connection.send(io, self.ack_cipher.clone()); self.state = HandshakeState::StartSession; Ok(()) } @@ -347,6 +340,7 @@ mod test { use super::*; use crypto::*; use hash::*; + use io::*; use std::net::SocketAddr; use mio::tcp::TcpStream; use network::stats::NetworkStats; @@ -371,6 +365,10 @@ mod test { Handshake::new(0, to, socket, &nonce, Arc::new(NetworkStats::new())).unwrap() } + fn test_io() -> IoContext { + IoContext::new(IoChannel::disconnected(), 0) + } + #[test] fn test_handshake_auth_plain() { let mut h = create_handshake(None); @@ -387,7 +385,7 @@ mod test { a4592ee77e2bd94d0be3691f3b406f9bba9b591fc63facc016bfa8\ ".from_hex().unwrap(); - h.read_auth(&secret, &auth).unwrap(); + h.read_auth(&test_io(), &secret, &auth).unwrap(); assert_eq!(h.state, super::HandshakeState::StartSession); check_auth(&h, 4); } @@ -411,9 +409,9 @@ mod test { 3bf7678318e2d5b5340c9e488eefea198576344afbdf66db5f51204a6961a63ce072c8926c\ ".from_hex().unwrap(); - h.read_auth(&secret, &auth[0..super::V4_AUTH_PACKET_SIZE]).unwrap(); + h.read_auth(&test_io(), &secret, &auth[0..super::V4_AUTH_PACKET_SIZE]).unwrap(); assert_eq!(h.state, super::HandshakeState::ReadingAuthEip8); - h.read_auth_eip8(&secret, &auth[super::V4_AUTH_PACKET_SIZE..]).unwrap(); + h.read_auth_eip8(&test_io(), &secret, &auth[super::V4_AUTH_PACKET_SIZE..]).unwrap(); assert_eq!(h.state, super::HandshakeState::StartSession); check_auth(&h, 4); } @@ -438,9 +436,9 @@ mod test { d490\ ".from_hex().unwrap(); - h.read_auth(&secret, &auth[0..super::V4_AUTH_PACKET_SIZE]).unwrap(); + h.read_auth(&test_io(), &secret, &auth[0..super::V4_AUTH_PACKET_SIZE]).unwrap(); assert_eq!(h.state, super::HandshakeState::ReadingAuthEip8); - h.read_auth_eip8(&secret, &auth[super::V4_AUTH_PACKET_SIZE..]).unwrap(); + h.read_auth_eip8(&test_io(), &secret, &auth[super::V4_AUTH_PACKET_SIZE..]).unwrap(); assert_eq!(h.state, super::HandshakeState::StartSession); check_auth(&h, 56); let ack = h.ack_cipher.clone(); diff --git a/util/src/network/host.rs b/util/src/network/host.rs index fe139e383..e5853b8db 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -215,8 +215,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone pub fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec) -> Result<(), UtilError> { let session = self.resolve_session(peer); if let Some(session) = session { - try!(session.lock().unwrap().deref_mut().send_packet(self.protocol, packet_id as u8, &data)); - try!(self.io.update_registration(peer)); + try!(session.lock().unwrap().deref_mut().send_packet(self.io, self.protocol, packet_id as u8, &data)); } else { trace!(target: "network", "Send: Peer no longer exist") } @@ -494,7 +493,7 @@ impl Host where Message: Send + Sync + Clone { for e in self.sessions.write().unwrap().iter_mut() { let mut s = e.lock().unwrap(); if !s.keep_alive(io) { - s.disconnect(DisconnectReason::PingTimeout); + s.disconnect(io, DisconnectReason::PingTimeout); to_kill.push(s.token()); } } @@ -616,10 +615,8 @@ impl Host where Message: Send + Sync + Clone { trace!(target: "network", "Session write error: {}: {:?}", token, e); } if s.done() { - io.deregister_stream(token).expect("Error deregistering stream"); - } else { - io.update_registration(token).unwrap_or_else(|e| debug!(target: "network", "Session registration error: {:?}", e)); - } + io.deregister_stream(token).unwrap_or_else(|e| debug!("Error deregistering stream: {:?}", e)); + } } } @@ -630,62 +627,65 @@ impl Host where Message: Send + Sync + Clone { fn session_readable(&self, token: StreamToken, io: &IoContext>) { let mut ready_data: Vec = Vec::new(); - let mut packet_data: Option<(ProtocolId, PacketId, Vec)> = None; + let mut packet_data: Vec<(ProtocolId, PacketId, Vec)> = Vec::new(); let mut kill = false; let session = { self.sessions.read().unwrap().get(token).cloned() }; if let Some(session) = session.clone() { let mut s = session.lock().unwrap(); - match s.readable(io, &self.info.read().unwrap()) { - Err(e) => { - trace!(target: "network", "Session read error: {}:{:?} ({:?}) {:?}", token, s.id(), s.remote_addr(), e); - match e { - UtilError::Network(NetworkError::Disconnect(DisconnectReason::UselessPeer)) | - UtilError::Network(NetworkError::Disconnect(DisconnectReason::IncompatibleProtocol)) => { - if let Some(id) = s.id() { - self.nodes.write().unwrap().mark_as_useless(id); + loop { + match s.readable(io, &self.info.read().unwrap()) { + Err(e) => { + trace!(target: "network", "Session read error: {}:{:?} ({:?}) {:?}", token, s.id(), s.remote_addr(), e); + match e { + UtilError::Network(NetworkError::Disconnect(DisconnectReason::UselessPeer)) | + UtilError::Network(NetworkError::Disconnect(DisconnectReason::IncompatibleProtocol)) => { + if let Some(id) = s.id() { + self.nodes.write().unwrap().mark_as_useless(id); + } + } + _ => (), + } + kill = true; + break; + }, + Ok(SessionData::Ready) => { + if !s.info.originated { + let session_count = self.session_count(); + let ideal_peers = { self.info.read().unwrap().deref().config.ideal_peers }; + if session_count >= ideal_peers as usize { + s.disconnect(io, DisconnectReason::TooManyPeers); + return; + } + // Add it no node table + if let Ok(address) = s.remote_addr() { + let entry = NodeEntry { id: s.id().unwrap().clone(), endpoint: NodeEndpoint { address: address, udp_port: address.port() } }; + self.nodes.write().unwrap().add_node(Node::new(entry.id.clone(), entry.endpoint.clone())); + let mut discovery = self.discovery.lock().unwrap(); + if let Some(ref mut discovery) = *discovery.deref_mut() { + discovery.add_node(entry); + } } } - _ => (), - } - kill = true; - }, - Ok(SessionData::Ready) => { - if !s.info.originated { - let session_count = self.session_count(); - let ideal_peers = { self.info.read().unwrap().deref().config.ideal_peers }; - if session_count >= ideal_peers as usize { - s.disconnect(DisconnectReason::TooManyPeers); - return; - } - // Add it no node table - if let Ok(address) = s.remote_addr() { - let entry = NodeEntry { id: s.id().unwrap().clone(), endpoint: NodeEndpoint { address: address, udp_port: address.port() } }; - self.nodes.write().unwrap().add_node(Node::new(entry.id.clone(), entry.endpoint.clone())); - let mut discovery = self.discovery.lock().unwrap(); - if let Some(ref mut discovery) = *discovery.deref_mut() { - discovery.add_node(entry); + self.num_sessions.fetch_add(1, AtomicOrdering::SeqCst); + for (p, _) in self.handlers.read().unwrap().iter() { + if s.have_capability(p) { + ready_data.push(p); } } - } - self.num_sessions.fetch_add(1, AtomicOrdering::SeqCst); - for (p, _) in self.handlers.read().unwrap().iter() { - if s.have_capability(p) { - ready_data.push(p); + }, + Ok(SessionData::Packet { + data, + protocol, + packet_id, + }) => { + match self.handlers.read().unwrap().get(protocol) { + None => { warn!(target: "network", "No handler found for protocol: {:?}", protocol) }, + Some(_) => packet_data.push((protocol, packet_id, data)), } - } - }, - Ok(SessionData::Packet { - data, - protocol, - packet_id, - }) => { - match self.handlers.read().unwrap().get(protocol) { - None => { warn!(target: "network", "No handler found for protocol: {:?}", protocol) }, - Some(_) => packet_data = Some((protocol, packet_id, data)), - } - }, - Ok(SessionData::None) => {}, - } + }, + Ok(SessionData::None) => break, + } + } } if kill { self.kill_connection(token, io, true); @@ -695,11 +695,10 @@ impl Host where Message: Send + Sync + Clone { self.stats.inc_sessions(); h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone()), &token); } - if let Some((p, packet_id, data)) = packet_data { + for (p, packet_id, data) in packet_data { let h = self.handlers.read().unwrap().get(p).unwrap().clone(); h.read(&NetworkContext::new(io, p, session.clone(), self.sessions.clone()), &token, packet_id, &data[1..]); } - io.update_registration(token).unwrap_or_else(|e| debug!(target: "network", "Token registration error: {:?}", e)); } fn connection_timeout(&self, token: StreamToken, io: &IoContext>) { @@ -742,10 +741,8 @@ impl Host where Message: Send + Sync + Clone { h.disconnected(&NetworkContext::new(io, p, expired_session.clone(), self.sessions.clone()), &token); } if deregister { - io.deregister_stream(token).expect("Error deregistering stream"); - } else if expired_session.is_some() { - io.update_registration(token).unwrap_or_else(|e| debug!(target: "network", "Connection registration error: {:?}", e)); - } + io.deregister_stream(token).unwrap_or_else(|e| debug!("Error deregistering stream: {:?}", e)); + } } fn update_nodes(&self, io: &IoContext>, node_changes: TableUpdates) { @@ -874,7 +871,7 @@ impl IoHandler> for Host where Messa NetworkIoMessage::Disconnect(ref peer) => { let session = { self.sessions.read().unwrap().get(*peer).cloned() }; if let Some(session) = session { - session.lock().unwrap().disconnect(DisconnectReason::DisconnectRequested); + session.lock().unwrap().disconnect(io, DisconnectReason::DisconnectRequested); } trace!(target: "network", "Disconnect requested {}", peer); self.kill_connection(*peer, io, false); @@ -882,7 +879,7 @@ impl IoHandler> for Host where Messa NetworkIoMessage::DisablePeer(ref peer) => { let session = { self.sessions.read().unwrap().get(*peer).cloned() }; if let Some(session) = session { - session.lock().unwrap().disconnect(DisconnectReason::DisconnectRequested); + session.lock().unwrap().disconnect(io, DisconnectReason::DisconnectRequested); if let Some(id) = session.lock().unwrap().id() { self.nodes.write().unwrap().mark_as_useless(id) } diff --git a/util/src/network/session.rs b/util/src/network/session.rs index 7b7f16c18..d5fd33813 100644 --- a/util/src/network/session.rs +++ b/util/src/network/session.rs @@ -145,7 +145,7 @@ impl Session { }) } - fn complete_handshake(&mut self, host: &HostInfo) -> Result<(), UtilError> { + fn complete_handshake(&mut self, io: &IoContext, host: &HostInfo) -> Result<(), UtilError> where Message: Send + Sync + Clone { let connection = if let State::Handshake(ref mut h) = self.state { self.info.id = Some(h.id.clone()); try!(EncryptedConnection::new(h)) @@ -153,8 +153,8 @@ impl Session { panic!("Unexpected state"); }; self.state = State::Session(connection); - try!(self.write_hello(host)); - try!(self.send_ping()); + try!(self.write_hello(io, host)); + try!(self.send_ping(io)); Ok(()) } @@ -220,10 +220,11 @@ impl Session { } } if let Some(data) = packet_data { - return Ok(try!(self.read_packet(data, host))); + return Ok(try!(self.read_packet(io, data, host))); } if create_session { - try!(self.complete_handshake(host)); + try!(self.complete_handshake(io, host)); + io.update_registration(self.token()).unwrap_or_else(|e| debug!(target: "network", "Token registration error: {:?}", e)); } Ok(SessionData::None) } @@ -263,7 +264,8 @@ impl Session { } /// Send a protocol packet to peer. - pub fn send_packet(&mut self, protocol: &str, packet_id: u8, data: &[u8]) -> Result<(), UtilError> { + pub fn send_packet(&mut self, io: &IoContext, protocol: &str, packet_id: u8, data: &[u8]) -> Result<(), UtilError> + where Message: Send + Sync + Clone { if self.info.capabilities.is_empty() || !self.had_hello { debug!(target: "network", "Sending to unconfirmed session {}, protocol: {}, packet: {}", self.token(), protocol, packet_id); return Err(From::from(NetworkError::BadProtocol)); @@ -283,7 +285,7 @@ impl Session { let mut rlp = RlpStream::new(); rlp.append(&(pid as u32)); rlp.append_raw(data, 1); - self.send(rlp) + self.send(io, rlp) } /// Keep this session alive. Returns false if ping timeout happened @@ -298,10 +300,9 @@ impl Session { }; if !timed_out && time::precise_time_ns() - self.ping_time_ns > PING_INTERVAL_SEC * 1000_000_000 { - if let Err(e) = self.send_ping() { + if let Err(e) = self.send_ping(io) { debug!("Error sending ping message: {:?}", e); } - io.update_registration(self.token()).unwrap_or_else(|e| debug!(target: "network", "Session registration error: {:?}", e)); } !timed_out } @@ -310,7 +311,8 @@ impl Session { self.connection().token() } - fn read_packet(&mut self, packet: Packet, host: &HostInfo) -> Result { + fn read_packet(&mut self, io: &IoContext, packet: Packet, host: &HostInfo) -> Result + where Message: Send + Sync + Clone { if packet.data.len() < 2 { return Err(From::from(NetworkError::BadProtocol)); } @@ -321,7 +323,7 @@ impl Session { match packet_id { PACKET_HELLO => { let rlp = UntrustedRlp::new(&packet.data[1..]); //TODO: validate rlp expected size - try!(self.read_hello(&rlp, host)); + try!(self.read_hello(io, &rlp, host)); Ok(SessionData::Ready) }, PACKET_DISCONNECT => { @@ -330,7 +332,7 @@ impl Session { Err(From::from(NetworkError::Disconnect(DisconnectReason::from_u8(reason)))) } PACKET_PING => { - try!(self.send_pong()); + try!(self.send_pong(io)); Ok(SessionData::None) }, PACKET_PONG => { @@ -362,7 +364,7 @@ impl Session { } } - fn write_hello(&mut self, host: &HostInfo) -> Result<(), UtilError> { + fn write_hello(&mut self, io: &IoContext, host: &HostInfo) -> Result<(), UtilError> where Message: Send + Sync + Clone { let mut rlp = RlpStream::new(); rlp.append_raw(&[PACKET_HELLO as u8], 0); rlp.begin_list(5) @@ -371,10 +373,11 @@ impl Session { .append(&host.capabilities) .append(&host.local_endpoint.address.port()) .append(host.id()); - self.send(rlp) + self.send(io, rlp) } - fn read_hello(&mut self, rlp: &UntrustedRlp, host: &HostInfo) -> Result<(), UtilError> { + fn read_hello(&mut self, io: &IoContext, rlp: &UntrustedRlp, host: &HostInfo) -> Result<(), UtilError> + where Message: Send + Sync + Clone { let protocol = try!(rlp.val_at::(0)); let client_version = try!(rlp.val_at::(1)); let peer_caps = try!(rlp.val_at::>(2)); @@ -417,36 +420,36 @@ impl Session { self.info.capabilities = caps; if self.info.capabilities.is_empty() { trace!(target: "network", "No common capabilities with peer."); - return Err(From::from(self.disconnect(DisconnectReason::UselessPeer))); + return Err(From::from(self.disconnect(io, DisconnectReason::UselessPeer))); } if protocol != host.protocol_version { trace!(target: "network", "Peer protocol version mismatch: {}", protocol); - return Err(From::from(self.disconnect(DisconnectReason::UselessPeer))); + return Err(From::from(self.disconnect(io, DisconnectReason::UselessPeer))); } self.had_hello = true; Ok(()) } /// Senf ping packet - pub fn send_ping(&mut self) -> Result<(), UtilError> { - try!(self.send(try!(Session::prepare(PACKET_PING)))); + pub fn send_ping(&mut self, io: &IoContext) -> Result<(), UtilError> where Message: Send + Sync + Clone { + try!(self.send(io, try!(Session::prepare(PACKET_PING)))); self.ping_time_ns = time::precise_time_ns(); self.pong_time_ns = None; Ok(()) } - fn send_pong(&mut self) -> Result<(), UtilError> { - self.send(try!(Session::prepare(PACKET_PONG))) + fn send_pong(&mut self, io: &IoContext) -> Result<(), UtilError> where Message: Send + Sync + Clone { + self.send(io, try!(Session::prepare(PACKET_PONG))) } /// Disconnect this session - pub fn disconnect(&mut self, reason: DisconnectReason) -> NetworkError { + pub fn disconnect(&mut self, io: &IoContext, reason: DisconnectReason) -> NetworkError where Message: Send + Sync + Clone { if let State::Session(_) = self.state { let mut rlp = RlpStream::new(); rlp.append(&(PACKET_DISCONNECT as u32)); rlp.begin_list(1); rlp.append(&(reason as u32)); - self.send(rlp).ok(); + self.send(io, rlp).ok(); } NetworkError::Disconnect(reason) } @@ -458,13 +461,13 @@ impl Session { Ok(rlp) } - fn send(&mut self, rlp: RlpStream) -> Result<(), UtilError> { + fn send(&mut self, io: &IoContext, rlp: RlpStream) -> Result<(), UtilError> where Message: Send + Sync + Clone { match self.state { State::Handshake(_) => { warn!(target:"network", "Unexpected send request"); }, State::Session(ref mut s) => { - try!(s.send_packet(&rlp.out())) + try!(s.send_packet(io, &rlp.out())) }, } Ok(())