diff --git a/Cargo.lock b/Cargo.lock index 28a4b8a7a..80bdcf349 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3697,6 +3697,24 @@ dependencies = [ "webpki 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "whisper-cli" +version = "0.1.0" +dependencies = [ + "docopt 0.8.3 (registry+https://github.com/rust-lang/crates.io-index)", + "ethcore-logger 1.11.0", + "ethcore-network 1.11.0", + "ethcore-network-devp2p 1.11.0", + "jsonrpc-core 8.0.1 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.11)", + "jsonrpc-http-server 8.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.11)", + "jsonrpc-pubsub 8.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.11)", + "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", + "panic_hook 0.1.0", + "parity-whisper 0.1.0", + "serde 1.0.37 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_derive 1.0.37 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "winapi" version = "0.2.8" diff --git a/Cargo.toml b/Cargo.toml index 202215ab3..bb92fff35 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -132,7 +132,8 @@ members = [ "evmbin", "miner", "transaction-pool", - "whisper" + "whisper", + "whisper/cli", ] [patch.crates-io] diff --git a/whisper/cli/Cargo.toml b/whisper/cli/Cargo.toml new file mode 100644 index 000000000..363471a1a --- /dev/null +++ b/whisper/cli/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "whisper-cli" +version = "0.1.0" +authors = ["Parity Technologies "] + +[dependencies] +ethcore-network-devp2p = { path = "../../util/network-devp2p" } +ethcore-network = { path = "../../util/network" } +ethcore-logger = { path = "../../logger" } +parity-whisper = { path = "../" } +docopt = "0.8" +serde = "1.0" +serde_derive = "1.0" +panic_hook = { path = "../../util/panic_hook" } +jsonrpc-core = { git = "https://github.com/paritytech/jsonrpc.git", branch = "parity-1.11" } +jsonrpc-pubsub = { git = "https://github.com/paritytech/jsonrpc.git", branch = "parity-1.11" } +jsonrpc-http-server = { git = "https://github.com/paritytech/jsonrpc.git", branch = "parity-1.11" } +log = "0.3" + +[[bin]] +name = "whisper-cli" +path = "src/main.rs" diff --git a/whisper/cli/src/main.rs b/whisper/cli/src/main.rs new file mode 100644 index 000000000..d0866cd66 --- /dev/null +++ b/whisper/cli/src/main.rs @@ -0,0 +1,300 @@ +// Copyright 2015-2018 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Whisper command line interface +//! +//! Spawns an Ethereum network instance and attaches the Whisper protocol RPCs to it. +//! + +extern crate docopt; +extern crate ethcore_network_devp2p as devp2p; +extern crate ethcore_network as net; +extern crate parity_whisper as whisper; +extern crate serde; +extern crate panic_hook; + +extern crate jsonrpc_core; +extern crate jsonrpc_pubsub; +extern crate jsonrpc_http_server; +extern crate ethcore_logger as log; + +#[macro_use] +extern crate log as rlog; + +#[macro_use] +extern crate serde_derive; + +use docopt::Docopt; +use std::{fmt, io, process, env, sync::Arc}; +use jsonrpc_core::{Metadata, MetaIoHandler}; +use jsonrpc_pubsub::{PubSubMetadata, Session}; +use jsonrpc_http_server::{AccessControlAllowOrigin, DomainsValidation}; + +const POOL_UNIT: usize = 1024 * 1024; +const USAGE: &'static str = r#" +Whisper CLI. + Copyright 2017 Parity Technologies (UK) Ltd + +Usage: + whisper [options] + whisper [-h | --help] + +Options: + --whisper-pool-size SIZE Specify Whisper pool size [default: 10]. + -p, --port PORT Specify which RPC port to use [default: 8545]. + -a, --address ADDRESS Specify which address to use [default: 127.0.0.1]. + -l, --log LEVEL Specify the logging level. Must conform to the same format as RUST_LOG [default: Error]. + -h, --help Display this message and exit. +"#; + +#[derive(Clone, Default)] +struct Meta; + +impl Metadata for Meta {} + +impl PubSubMetadata for Meta { + fn session(&self) -> Option> { + None + } +} + +#[derive(Debug, Deserialize)] +struct Args { + flag_whisper_pool_size: usize, + flag_port: String, + flag_address: String, + flag_log: String, +} + +struct WhisperPoolHandle { + /// Pool handle. + handle: Arc>>, + /// Network manager. + net: Arc, +} + +impl whisper::rpc::PoolHandle for WhisperPoolHandle { + fn relay(&self, message: whisper::message::Message) -> bool { + let mut res = false; + let mut message = Some(message); + self.with_proto_context(whisper::net::PROTOCOL_ID, &mut |ctx| { + if let Some(message) = message.take() { + res = self.handle.post_message(message, ctx); + } + }); + res + } + + fn pool_status(&self) -> whisper::net::PoolStatus { + self.handle.pool_status() + } +} + +impl WhisperPoolHandle { + fn with_proto_context(&self, proto: net::ProtocolId, f: &mut FnMut(&net::NetworkContext)) { + self.net.with_context_eval(proto, f); + } +} + +struct RpcFactory { + handle: Arc>>, + manager: Arc, +} + +impl RpcFactory { + fn make_handler(&self, net: Arc) -> whisper::rpc::WhisperClient { + let whisper_pool_handle = WhisperPoolHandle { handle: self.handle.clone(), net: net }; + whisper::rpc::WhisperClient::new(whisper_pool_handle, self.manager.clone()) + } +} + +#[derive(Debug)] +enum Error { + Docopt(docopt::Error), + Io(io::Error), + JsonRpc(jsonrpc_core::Error), + Network(net::Error), + SockAddr(std::net::AddrParseError), + Logger(String), +} + +impl From for Error { + fn from(err: std::net::AddrParseError) -> Self { + Error::SockAddr(err) + } +} + +impl From for Error { + fn from(err: net::Error) -> Self { + Error::Network(err) + } +} + +impl From for Error { + fn from(err: docopt::Error) -> Self { + Error::Docopt(err) + } +} + +impl From for Error { + fn from(err: io::Error) -> Self { + Error::Io(err) + } +} + +impl From for Error { + fn from(err: jsonrpc_core::Error) -> Self { + Error::JsonRpc(err) + } +} + +impl From for Error { + fn from(err: String) -> Self { + Error::Logger(err) + } +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + match *self { + Error::SockAddr(ref e) => write!(f, "SockAddrError: {}", e), + Error::Docopt(ref e) => write!(f, "DocoptError: {}", e), + Error::Io(ref e) => write!(f, "IoError: {}", e), + Error::JsonRpc(ref e) => write!(f, "JsonRpcError: {:?}", e), + Error::Network(ref e) => write!(f, "NetworkError: {}", e), + Error::Logger(ref e) => write!(f, "LoggerError: {}", e), + } + } +} + +fn main() { + panic_hook::set(); + + match execute(env::args()) { + Ok(_) => { + println!("whisper-cli terminated"); + process::exit(1); + } + Err(err) => { + println!("{}", err); + process::exit(1); + }, + } +} + +fn execute(command: I) -> Result<(), Error> where I: IntoIterator, S: AsRef { + + // Parse arguments + let args: Args = Docopt::new(USAGE).and_then(|d| d.argv(command).deserialize())?; + let pool_size = args.flag_whisper_pool_size * POOL_UNIT; + let url = format!("{}:{}", args.flag_address, args.flag_port); + + initialize_logger(args.flag_log)?; + info!(target: "whisper-cli", "start"); + + // Filter manager that will dispatch `decryption tasks` + let manager = Arc::new(whisper::rpc::FilterManager::new()?); + + // Whisper protocol network handler + let whisper_network_handler = Arc::new(whisper::net::Network::new(pool_size, manager.clone())); + + // Create network service + let network = devp2p::NetworkService::new(net::NetworkConfiguration::new_local(), None)?; + + // Start network service + network.start()?; + + // Attach whisper protocol to the network service + network.register_protocol(whisper_network_handler.clone(), whisper::net::PROTOCOL_ID, whisper::net::PACKET_COUNT, + whisper::net::SUPPORTED_VERSIONS)?; + network.register_protocol(Arc::new(whisper::net::ParityExtensions), whisper::net::PARITY_PROTOCOL_ID, + whisper::net::PACKET_COUNT, whisper::net::SUPPORTED_VERSIONS)?; + + // Request handler + let mut io = MetaIoHandler::default(); + + // Shared network service + let shared_network = Arc::new(network); + + // Pool handler + let whisper_factory = RpcFactory { handle: whisper_network_handler, manager: manager }; + + io.extend_with(whisper::rpc::Whisper::to_delegate(whisper_factory.make_handler(shared_network.clone()))); + io.extend_with(whisper::rpc::WhisperPubSub::to_delegate(whisper_factory.make_handler(shared_network.clone()))); + + let server = jsonrpc_http_server::ServerBuilder::new(io) + .cors(DomainsValidation::AllowOnly(vec![AccessControlAllowOrigin::Null])) + .start_http(&url.parse()?)?; + + server.wait(); + + // This will never return if the http server runs without errors + Ok(()) +} + +fn initialize_logger(log_level: String) -> Result<(), String> { + let mut l = log::Config::default(); + l.mode = Some(log_level); + log::setup_log(&l)?; + Ok(()) +} + + +#[cfg(test)] +mod tests { + use super::execute; + + #[test] + fn invalid_argument() { + let command = vec!["whisper-cli", "--foo=12"] + .into_iter() + .map(Into::into) + .collect::>(); + + assert!(execute(command).is_err()); + } + + #[test] + #[ignore] + fn privileged_port() { + let command = vec!["whisper-cli", "--port=3"] + .into_iter() + .map(Into::into) + .collect::>(); + + assert!(execute(command).is_err()); + } + + #[test] + fn invalid_ip_address() { + let command = vec!["whisper-cli", "--address=x.x.x.x"] + .into_iter() + .map(Into::into) + .collect::>(); + + assert!(execute(command).is_err()); + } + + #[test] + fn invalid_whisper_pool_size() { + let command = vec!["whisper-cli", "--whisper-pool-size=-100000000000000000000000000000000000000"] + .into_iter() + .map(Into::into) + .collect::>(); + + assert!(execute(command).is_err()); + } +} diff --git a/whisper/src/rpc/filter.rs b/whisper/src/rpc/filter.rs index 663c6a6bf..5a192ac04 100644 --- a/whisper/src/rpc/filter.rs +++ b/whisper/src/rpc/filter.rs @@ -17,8 +17,7 @@ //! Abstraction over filters which works with polling and subscription. use std::collections::HashMap; -use std::sync::{mpsc, Arc}; -use std::thread; +use std::{sync::{Arc, atomic, atomic::AtomicBool, mpsc}, thread}; use ethereum_types::{H256, H512}; use ethkey::Public; @@ -27,8 +26,7 @@ use parking_lot::{Mutex, RwLock}; use rand::{Rng, OsRng}; use message::{Message, Topic}; -use super::key_store::KeyStore; -use super::types::{self, FilterItem, HexEncode}; +use super::{key_store::KeyStore, types::{self, FilterItem, HexEncode}}; /// Kinds of filters, #[derive(PartialEq, Eq, Clone, Copy)] @@ -53,6 +51,7 @@ pub struct Manager { filters: RwLock>, tx: Mutex>>, join: Option>, + exit: Arc, } impl Manager { @@ -60,15 +59,29 @@ impl Manager { /// the given thread pool. pub fn new() -> ::std::io::Result { let (tx, rx) = mpsc::channel::>(); + let exit = Arc::new(AtomicBool::new(false)); + let e = exit.clone(); + let join_handle = thread::Builder::new() .name("Whisper Decryption Worker".to_string()) - .spawn(move || for item in rx { (item)() })?; + .spawn(move || { + trace!(target: "parity_whisper", "Start decryption worker"); + loop { + if exit.load(atomic::Ordering::Acquire) { + break; + } + if let Ok(item) = rx.try_recv() { + item(); + } + } + })?; Ok(Manager { key_store: Arc::new(RwLock::new(KeyStore::new()?)), filters: RwLock::new(HashMap::new()), tx: Mutex::new(tx), join: Some(join_handle), + exit: e, }) } @@ -103,7 +116,7 @@ impl Manager { } /// Insert new subscription filter. Generates a secure ID and sends it to - /// the + /// the subscriber pub fn insert_subscription(&self, filter: Filter, sub: Subscriber) -> Result<(), &'static str> { @@ -180,9 +193,12 @@ impl ::net::MessageHandler for Arc { impl Drop for Manager { fn drop(&mut self) { + trace!(target: "parity_whisper", "waiting to drop FilterManager"); + self.exit.store(true, atomic::Ordering::Release); if let Some(guard) = self.join.take() { let _ = guard.join(); } + trace!(target: "parity_whisper", "FilterManager dropped"); } }