From 99075ad22a8c629d33520aae3ce90c91d9418a1b Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Fri, 14 Jul 2017 20:40:28 +0200 Subject: [PATCH] Initial Whisper implementation (#6009) * whisper skeleton * basic message store * rallying and message logic * pass host info to network protocol handlers * choose who starts rally based on node key * module reshuffling * mining messages * prune messages by low PoW until below size target * associated error type for ethkey generators and `OsRng` generator * beginnings of RPC * generic message handler for whisper * reshuffle code order * standard payload encoding and decoding * basic crypto * minor restructuring of net code * implement shh_post * merge? * implement filters * rand trait for hash types * filter RPCs for whisper * symmetric encryption of payload * pub-sub * filter tests * use only secure random IDs * attach arbitrary protocols to network * basic integration of whisper into Parity * eagerly prune low PoW entries * broadcast messages with salted topics * node info RPC * fix import * fix leading zeros calculation * address minor grumbles --- Cargo.lock | 47 ++ Cargo.toml | 3 +- ethcore/light/src/net/mod.rs | 4 +- ethcore/src/engines/epoch_verifier.rs | 49 ++ ethkey/src/brain.rs | 15 +- ethkey/src/lib.rs | 32 +- ethkey/src/math.rs | 6 + ethkey/src/prefix.rs | 2 + ethkey/src/random.rs | 24 +- parity/cli/config.full.toml | 4 + parity/cli/config.toml | 4 + parity/cli/mod.rs | 29 +- parity/cli/usage.txt | 5 + parity/configuration.rs | 11 + parity/main.rs | 2 + parity/modules.rs | 5 +- parity/rpc.rs | 2 - parity/rpc_apis.rs | 61 ++- parity/run.rs | 28 ++ parity/sync.rs | 7 +- parity/whisper.rs | 77 ++++ sync/src/api.rs | 56 ++- util/bigint/src/hash.rs | 12 +- util/network/src/host.rs | 5 +- util/network/src/lib.rs | 8 +- util/network/src/tests.rs | 2 +- whisper/Cargo.toml | 32 ++ whisper/README.md | 3 + whisper/src/lib.rs | 59 +++ whisper/src/message.rs | 479 +++++++++++++++++++ whisper/src/net.rs | 638 ++++++++++++++++++++++++++ whisper/src/rpc/crypto.rs | 316 +++++++++++++ whisper/src/rpc/filter.rs | 416 +++++++++++++++++ whisper/src/rpc/key_store.rs | 197 ++++++++ whisper/src/rpc/mod.rs | 402 ++++++++++++++++ whisper/src/rpc/payload.rs | 357 ++++++++++++++ whisper/src/rpc/types.rs | 298 ++++++++++++ 37 files changed, 3642 insertions(+), 55 deletions(-) create mode 100644 ethcore/src/engines/epoch_verifier.rs create mode 100644 parity/whisper.rs create mode 100644 whisper/Cargo.toml create mode 100644 whisper/README.md create mode 100644 whisper/src/lib.rs create mode 100644 whisper/src/message.rs create mode 100644 whisper/src/net.rs create mode 100644 whisper/src/rpc/crypto.rs create mode 100644 whisper/src/rpc/filter.rs create mode 100644 whisper/src/rpc/key_store.rs create mode 100644 whisper/src/rpc/mod.rs create mode 100644 whisper/src/rpc/payload.rs create mode 100644 whisper/src/rpc/types.rs diff --git a/Cargo.lock b/Cargo.lock index ad3cca5c7..e97b1c772 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1014,6 +1014,11 @@ dependencies = [ "unicode-segmentation 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "hex" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "hidapi" version = "0.3.1" @@ -1692,6 +1697,15 @@ name = "order-stat" version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "ordered-float" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "num-traits 0.1.32 (registry+https://github.com/rust-lang/crates.io-index)", + "unreachable 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "owning_ref" version = "0.3.3" @@ -1741,6 +1755,7 @@ dependencies = [ "parity-rpc 1.8.0", "parity-rpc-client 1.4.0", "parity-updater 1.8.0", + "parity-whisper 0.1.0", "path 0.1.0", "pretty_assertions 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "regex 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1998,6 +2013,36 @@ dependencies = [ "parking_lot 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "parity-whisper" +version = "0.1.0" +dependencies = [ + "bitflags 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", + "byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "ethcore-bigint 0.1.3", + "ethcore-network 1.8.0", + "ethcrypto 0.1.0", + "ethkey 0.2.0", + "futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", + "hex 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", + "jsonrpc-macros 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", + "jsonrpc-pubsub 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", + "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", + "ordered-float 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", + "ring 0.9.5 (registry+https://github.com/rust-lang/crates.io-index)", + "rlp 0.2.0", + "serde 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_derive 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "smallvec 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", + "tiny-keccak 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "parity-wordlist" version = "1.0.1" @@ -3128,6 +3173,7 @@ dependencies = [ "checksum hamming 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "65043da274378d68241eb9a8f8f8aa54e349136f7b8e12f63e3ef44043cc30e1" "checksum heapsize 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4c7593b1522161003928c959c20a2ca421c68e940d63d75573316a009e48a6d4" "checksum heck 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6f807d2f64cc044a6bcf250ff23e59be0deec7a16612c014f962a06fa7e020f9" +"checksum hex 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d6a22814455d41612f41161581c2883c0c6a1c41852729b17d5ed88f01e153aa" "checksum hidapi 0.3.1 (git+https://github.com/paritytech/hidapi-rs)" = "" "checksum httparse 1.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "46534074dbb80b070d60a5cb8ecadd8963a00a438ae1a95268850a7ef73b67ae" "checksum hyper 0.10.0-a.0 (git+https://github.com/paritytech/hyper)" = "" @@ -3197,6 +3243,7 @@ dependencies = [ "checksum openssl 0.9.13 (registry+https://github.com/rust-lang/crates.io-index)" = "b34cd77cf91301fff3123fbd46b065c3b728b17a392835de34c397315dce5586" "checksum openssl-sys 0.9.13 (registry+https://github.com/rust-lang/crates.io-index)" = "e035022a50faa380bd7ccdbd184d946ce539ebdb0a358780de92a995882af97a" "checksum order-stat 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "efa535d5117d3661134dbf1719b6f0ffe06f2375843b13935db186cd094105eb" +"checksum ordered-float 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "58d25b6c0e47b20d05226d288ff434940296e7e2f8b877975da32f862152241f" "checksum owning_ref 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "cdf84f41639e037b484f93433aa3897863b561ed65c6e59c7073d7c561710f37" "checksum parity-dapps-glue 1.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e1d06f6ee0fda786df3784a96ee3f0629f529b91cbfb7d142f6410e6bcd1ce2c" "checksum parity-tokio-ipc 0.1.5 (git+https://github.com/nikvolf/parity-tokio-ipc)" = "" diff --git a/Cargo.toml b/Cargo.toml index 93341b564..d541636da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ parity-reactor = { path = "util/reactor" } parity-rpc = { path = "rpc" } parity-rpc-client = { path = "rpc_client" } parity-updater = { path = "updater" } +parity-whisper = { path = "whisper" } path = { path = "util/path" } parity-dapps = { path = "dapps", optional = true } @@ -106,4 +107,4 @@ lto = false panic = "abort" [workspace] -members = ["ethstore/cli", "ethkey/cli", "evmbin"] +members = ["ethstore/cli", "ethkey/cli", "evmbin", "whisper"] diff --git a/ethcore/light/src/net/mod.rs b/ethcore/light/src/net/mod.rs index 27afed3d5..f823678a9 100644 --- a/ethcore/light/src/net/mod.rs +++ b/ethcore/light/src/net/mod.rs @@ -21,7 +21,7 @@ use ethcore::transaction::UnverifiedTransaction; use io::TimerToken; -use network::{NetworkProtocolHandler, NetworkContext, PeerId}; +use network::{HostInfo, NetworkProtocolHandler, NetworkContext, PeerId}; use rlp::{RlpStream, UntrustedRlp}; use util::hash::H256; use util::{DBValue, Mutex, RwLock, U256}; @@ -1074,7 +1074,7 @@ fn punish(peer: PeerId, io: &IoContext, e: Error) { } impl NetworkProtocolHandler for LightProtocol { - fn initialize(&self, io: &NetworkContext) { + fn initialize(&self, io: &NetworkContext, _host_info: &HostInfo) { io.register_timer(TIMEOUT, TIMEOUT_INTERVAL_MS) .expect("Error registering sync timer."); io.register_timer(TICK_TIMEOUT, TICK_TIMEOUT_INTERVAL_MS) diff --git a/ethcore/src/engines/epoch_verifier.rs b/ethcore/src/engines/epoch_verifier.rs new file mode 100644 index 000000000..cd712baef --- /dev/null +++ b/ethcore/src/engines/epoch_verifier.rs @@ -0,0 +1,49 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +// Epoch verifiers. + +use error::Error; +use header::Header; + +/// Verifier for all blocks within an epoch with self-contained state. +/// +/// See docs on `Engine` relating to proving functions for more details. +pub trait EpochVerifier: Send + Sync { + /// Get the epoch number. + fn epoch_number(&self) -> u64; + + /// Lightly verify the next block header. + /// This may not be a header belonging to a different epoch. + fn verify_light(&self, header: &Header) -> Result<(), Error>; + + /// Perform potentially heavier checks on the next block header. + fn verify_heavy(&self, header: &Header) -> Result<(), Error> { + self.verify_light(header) + } + + /// Check if the header is the end of an epoch. + fn is_epoch_end(&self, header: &Header, Ancestry) -> EpochChange; + +} + +/// Special "no-op" verifier for stateless, epoch-less engines. +pub struct NoOp; + +impl EpochVerifier for NoOp { + fn epoch_number(&self) -> u64 { 0 } + fn verify_light(&self, _header: &Header) -> Result<(), Error> { Ok(()) } +} diff --git a/ethkey/src/brain.rs b/ethkey/src/brain.rs index 9976bdb01..8bc9d2cd7 100644 --- a/ethkey/src/brain.rs +++ b/ethkey/src/brain.rs @@ -15,7 +15,7 @@ // along with Parity. If not, see . use keccak::Keccak256; -use super::{KeyPair, Error, Generator, Secret}; +use super::{KeyPair, Generator, Secret}; /// Simple brainwallet. pub struct Brain(String); @@ -27,7 +27,9 @@ impl Brain { } impl Generator for Brain { - fn generate(self) -> Result { + type Error = ::Void; + + fn generate(self) -> Result { let seed = self.0; let mut secret = seed.into_bytes().keccak256(); @@ -38,11 +40,10 @@ impl Generator for Brain { match i > 16384 { false => i += 1, true => { - if let Ok(secret) = Secret::from_unsafe_slice(&secret) { - let result = KeyPair::from_secret(secret); - if result.as_ref().ok().map_or(false, |r| r.address()[0] == 0) { - return result; - } + if let Ok(pair) = Secret::from_unsafe_slice(&secret) + .and_then(KeyPair::from_secret) + { + if pair.address()[0] == 0 { return Ok(pair) } } }, } diff --git a/ethkey/src/lib.rs b/ethkey/src/lib.rs index e87e198df..271a5f166 100644 --- a/ethkey/src/lib.rs +++ b/ethkey/src/lib.rs @@ -15,8 +15,6 @@ // along with Parity. If not, see . extern crate rand; -#[macro_use] -extern crate lazy_static; extern crate tiny_keccak; extern crate secp256k1; extern crate rustc_hex; @@ -24,6 +22,9 @@ extern crate ethcore_bigint as bigint; extern crate crypto as rcrypto; extern crate byteorder; +#[macro_use] +extern crate lazy_static; + mod brain; mod error; mod keypair; @@ -34,21 +35,12 @@ mod signature; mod secret; mod extended; -lazy_static! { - pub static ref SECP256K1: secp256k1::Secp256k1 = secp256k1::Secp256k1::new(); -} - -/// Generates new keypair. -pub trait Generator { - /// Should be called to generate new keypair. - fn generate(self) -> Result; -} - pub mod math; pub use self::brain::Brain; pub use self::error::Error; pub use self::keypair::{KeyPair, public_to_address}; +pub use self::math::public_is_valid; pub use self::prefix::Prefix; pub use self::random::Random; pub use self::signature::{sign, verify_public, verify_address, recover, Signature}; @@ -57,6 +49,22 @@ pub use self::extended::{ExtendedPublic, ExtendedSecret, ExtendedKeyPair, Deriva use bigint::hash::{H160, H256, H512}; +lazy_static! { + pub static ref SECP256K1: secp256k1::Secp256k1 = secp256k1::Secp256k1::new(); +} + +/// Uninstantiatable error type for infallible generators. +#[derive(Debug)] +pub enum Void {} + +/// Generates new keypair. +pub trait Generator { + type Error; + + /// Should be called to generate new keypair. + fn generate(self) -> Result; +} + pub type Address = H160; pub type Message = H256; pub type Public = H512; diff --git a/ethkey/src/math.rs b/ethkey/src/math.rs index cf7012132..4c875bd8f 100644 --- a/ethkey/src/math.rs +++ b/ethkey/src/math.rs @@ -20,6 +20,12 @@ use secp256k1::constants::{GENERATOR_X, GENERATOR_Y, CURVE_ORDER}; use bigint::prelude::U256; use bigint::hash::H256; +/// Whether the public key is valid. +pub fn public_is_valid(public: &Public) -> bool { + to_secp256k1_public(public).ok() + .map_or(false, |p| p.is_valid()) +} + /// Inplace multiply public key by secret key (EC point * scalar) pub fn public_mul_secret(public: &mut Public, secret: &Secret) -> Result<(), Error> { let key_secret = secret.to_secp256k1_secret()?; diff --git a/ethkey/src/prefix.rs b/ethkey/src/prefix.rs index 174c877b0..25b1ab3f7 100644 --- a/ethkey/src/prefix.rs +++ b/ethkey/src/prefix.rs @@ -32,6 +32,8 @@ impl Prefix { } impl Generator for Prefix { + type Error = Error; + fn generate(self) -> Result { for _ in 0..self.iterations { let keypair = Random.generate()?; diff --git a/ethkey/src/random.rs b/ethkey/src/random.rs index b84402427..715dd3cb5 100644 --- a/ethkey/src/random.rs +++ b/ethkey/src/random.rs @@ -15,18 +15,30 @@ // along with Parity. If not, see . use rand::os::OsRng; -use super::{Generator, KeyPair, Error, SECP256K1}; +use super::{Generator, KeyPair, SECP256K1}; -/// Randomly generates new keypair. +/// Randomly generates new keypair, instantiating the RNG each time. pub struct Random; impl Generator for Random { - fn generate(self) -> Result { - let context = &SECP256K1; + type Error = ::std::io::Error; + + fn generate(self) -> Result { let mut rng = OsRng::new()?; - let (sec, publ) = context.generate_keypair(&mut rng)?; + match rng.generate() { + Ok(pair) => Ok(pair), + Err(void) => match void {}, // LLVM unreachable + } + } +} + +impl<'a> Generator for &'a mut OsRng { + type Error = ::Void; + + fn generate(self) -> Result { + let (sec, publ) = SECP256K1.generate_keypair(self) + .expect("context always created with full capabilities; qed"); Ok(KeyPair::from_keypair(sec, publ)) } } - diff --git a/parity/cli/config.full.toml b/parity/cli/config.full.toml index 581871997..b0e91e56c 100644 --- a/parity/cli/config.full.toml +++ b/parity/cli/config.full.toml @@ -144,3 +144,7 @@ jit = false logging = "own_tx=trace" log_file = "/var/log/parity.log" color = true + +[whisper] +enabled = false +pool_size = 20 diff --git a/parity/cli/config.toml b/parity/cli/config.toml index 0ad9e7753..4af4ca076 100644 --- a/parity/cli/config.toml +++ b/parity/cli/config.toml @@ -84,3 +84,7 @@ log_file = "/var/log/parity.log" color = true ports_shift = 0 unsafe_expose = false + +[whisper] +enabled = true +pool_size = 50 diff --git a/parity/cli/mod.rs b/parity/cli/mod.rs index 262e054a2..fa3df6123 100644 --- a/parity/cli/mod.rs +++ b/parity/cli/mod.rs @@ -176,7 +176,7 @@ usage! { or |c: &Config| otry!(c.rpc).interface.clone(), flag_jsonrpc_cors: Option = None, or |c: &Config| otry!(c.rpc).cors.clone().map(Some), - flag_jsonrpc_apis: String = "web3,eth,pubsub,net,parity,parity_pubsub,traces,rpc,secretstore", + flag_jsonrpc_apis: String = "web3,eth,pubsub,net,parity,parity_pubsub,traces,rpc,secretstore,shh,shh_pubsub", or |c: &Config| otry!(c.rpc).apis.as_ref().map(|vec| vec.join(",")), flag_jsonrpc_hosts: String = "none", or |c: &Config| otry!(c.rpc).hosts.as_ref().map(|vec| vec.join(",")), @@ -192,7 +192,7 @@ usage! { or |c: &Config| otry!(c.websockets).port.clone(), flag_ws_interface: String = "local", or |c: &Config| otry!(c.websockets).interface.clone(), - flag_ws_apis: String = "web3,eth,pubsub,net,parity,parity_pubsub,traces,rpc,secretstore", + flag_ws_apis: String = "web3,eth,pubsub,net,parity,parity_pubsub,traces,rpc,secretstore,shh,shh_pubsub", or |c: &Config| otry!(c.websockets).apis.as_ref().map(|vec| vec.join(",")), flag_ws_origins: String = "chrome-extension://*", or |c: &Config| otry!(c.websockets).origins.as_ref().map(|vec| vec.join(",")), @@ -204,7 +204,7 @@ usage! { or |c: &Config| otry!(c.ipc).disable.clone(), flag_ipc_path: String = if cfg!(windows) { r"\\.\pipe\jsonrpc.ipc" } else { "$BASE/jsonrpc.ipc" }, or |c: &Config| otry!(c.ipc).path.clone(), - flag_ipc_apis: String = "web3,eth,pubsub,net,parity,parity_pubsub,parity_accounts,traces,rpc,secretstore", + flag_ipc_apis: String = "web3,eth,pubsub,net,parity,parity_pubsub,parity_accounts,traces,rpc,secretstore,shh,shh_pubsub", or |c: &Config| otry!(c.ipc).apis.as_ref().map(|vec| vec.join(",")), // DAPPS @@ -365,6 +365,12 @@ usage! { flag_no_color: bool = false, or |c: &Config| otry!(c.misc).color.map(|c| !c).clone(), + // -- Whisper options + flag_whisper: bool = false, + or |c: &Config| otry!(c.whisper).enabled, + flag_whisper_pool_size: usize = 10usize, + or |c: &Config| otry!(c.whisper).pool_size.clone(), + // -- Legacy Options supported in configs flag_dapps_port: Option = None, or |c: &Config| otry!(c.dapps).port.clone().map(Some), @@ -407,6 +413,7 @@ struct Config { vm: Option, misc: Option, stratum: Option, + whisper: Option, } #[derive(Default, Debug, PartialEq, Deserialize)] @@ -603,12 +610,18 @@ struct Misc { unsafe_expose: Option, } +#[derive(Default, Debug, PartialEq, Deserialize)] +struct Whisper { + enabled: Option, + pool_size: Option, +} + #[cfg(test)] mod tests { use super::{ Args, ArgsError, Config, Operating, Account, Ui, Network, Ws, Rpc, Ipc, Dapps, Ipfs, Mining, Footprint, - Snapshots, VM, Misc, SecretStore, + Snapshots, VM, Misc, Whisper, SecretStore, }; use toml; @@ -860,6 +873,10 @@ mod tests { // -- Virtual Machine Options flag_jitvm: false, + // -- Whisper options. + flag_whisper: false, + flag_whisper_pool_size: 20, + // -- Legacy Options flag_geth: false, flag_testnet: false, @@ -1082,6 +1099,10 @@ mod tests { ports_shift: Some(0), unsafe_expose: Some(false), }), + whisper: Some(Whisper { + enabled: Some(true), + pool_size: Some(50), + }), stratum: None, }); } diff --git a/parity/cli/usage.txt b/parity/cli/usage.txt index dc4796e05..1bfb8c499 100644 --- a/parity/cli/usage.txt +++ b/parity/cli/usage.txt @@ -425,6 +425,11 @@ Snapshot Options: Virtual Machine Options: --jitvm Enable the JIT VM. (default: {flag_jitvm}) +Whisper Options: + --whisper Enable the Whisper network. (default: {flag_whisper}) + --whisper-pool-size MB Target size of the whisper message pool in megabytes. + (default: {flag_whisper_pool_size}) + Legacy Options: --geth Run in Geth-compatibility mode. Sets the IPC path to be the same as Geth's. Overrides the --ipc-path diff --git a/parity/configuration.rs b/parity/configuration.rs index fe397dff5..ee06b6b46 100644 --- a/parity/configuration.rs +++ b/parity/configuration.rs @@ -338,6 +338,8 @@ impl Configuration { _ => (self.gas_pricer_config()?, self.miner_options(self.args.flag_reseal_min_period)?), }; + let whisper_config = self.whisper_config(); + let run_cmd = RunCmd { cache_config: cache_config, dirs: dirs, @@ -383,6 +385,7 @@ impl Configuration { serve_light: !self.args.flag_no_serve_light, light: self.args.flag_light, no_persistent_txqueue: self.args.flag_no_persistent_txqueue, + whisper: whisper_config, }; Cmd::Run(run_cmd) }; @@ -1068,6 +1071,13 @@ impl Configuration { settings } + + fn whisper_config(&self) -> ::whisper::Config { + ::whisper::Config { + enabled: self.args.flag_whisper, + target_message_pool_size: self.args.flag_whisper_pool_size * 1024 * 1024, + } + } } #[cfg(test)] @@ -1326,6 +1336,7 @@ mod tests { serve_light: true, light: false, no_persistent_txqueue: false, + whisper: Default::default(), }; expected.secretstore_conf.enabled = cfg!(feature = "secretstore"); assert_eq!(conf.into_command().unwrap().cmd, Cmd::Run(expected)); diff --git a/parity/main.rs b/parity/main.rs index 21a7cc2d4..c35cdb6ad 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -63,6 +63,7 @@ extern crate parity_local_store as local_store; extern crate parity_reactor; extern crate parity_rpc; extern crate parity_updater as updater; +extern crate parity_whisper; extern crate path; extern crate rpc_cli; @@ -110,6 +111,7 @@ mod snapshot; mod upgrade; mod url; mod user_defaults; +mod whisper; #[cfg(feature="ipc")] mod boot; diff --git a/parity/modules.rs b/parity/modules.rs index 032f87c46..c7aea7dfa 100644 --- a/parity/modules.rs +++ b/parity/modules.rs @@ -19,7 +19,7 @@ use std::path::Path; use ethcore::client::BlockChainClient; use hypervisor::Hypervisor; -use ethsync::{SyncConfig, NetworkConfiguration, NetworkError, Params}; +use ethsync::{AttachedProtocol, SyncConfig, NetworkConfiguration, NetworkError, Params}; use ethcore::snapshot::SnapshotService; use light::Provider; @@ -151,6 +151,7 @@ pub fn sync( _snapshot_service: Arc, _provider: Arc, log_settings: &LogConfig, + _attached_protos: Vec, ) -> Result { let mut hypervisor = hypervisor_ref.take().expect("There should be hypervisor for ipc configuration"); let args = sync_arguments(&hypervisor.io_path, sync_cfg, net_cfg, log_settings); @@ -181,6 +182,7 @@ pub fn sync( snapshot_service: Arc, provider: Arc, _log_settings: &LogConfig, + attached_protos: Vec, ) -> Result { let eth_sync = EthSync::new(Params { config: sync_cfg, @@ -188,6 +190,7 @@ pub fn sync( provider: provider, snapshot_service: snapshot_service, network_config: net_cfg, + attached_protos: attached_protos, })?; Ok((eth_sync.clone() as Arc, eth_sync.clone() as Arc, eth_sync.clone() as Arc)) diff --git a/parity/rpc.rs b/parity/rpc.rs index b79e23b79..95c6f0bbf 100644 --- a/parity/rpc.rs +++ b/parity/rpc.rs @@ -32,7 +32,6 @@ pub use parity_rpc::{IpcServer, HttpServer, RequestMiddleware}; pub use parity_rpc::ws::Server as WsServer; pub use parity_rpc::informant::CpuPool; - pub const DAPPS_DOMAIN: &'static str = "web3.site"; #[derive(Debug, Clone, PartialEq)] @@ -168,7 +167,6 @@ impl Default for WsConfiguration { } } - impl WsConfiguration { pub fn address(&self) -> Option<(String, u16)> { match self.enabled { diff --git a/parity/rpc_apis.rs b/parity/rpc_apis.rs index bcac866ff..234a7d299 100644 --- a/parity/rpc_apis.rs +++ b/parity/rpc_apis.rs @@ -67,6 +67,12 @@ pub enum Api { Rpc, /// SecretStore (Safe) SecretStore, + /// Whisper (Safe) + // TODO: _if_ someone guesses someone else's key or filter IDs they can remove + // BUT these are all ephemeral so it seems fine. + Whisper, + /// Whisper Pub-Sub (Safe but same concerns as above). + WhisperPubSub, } impl FromStr for Api { @@ -89,6 +95,8 @@ impl FromStr for Api { "traces" => Ok(Traces), "rpc" => Ok(Rpc), "secretstore" => Ok(SecretStore), + "shh" => Ok(Whisper), + "shh_pubsub" => Ok(WhisperPubSub), api => Err(format!("Unknown api: {}", api)) } } @@ -172,6 +180,8 @@ fn to_modules(apis: &HashSet) -> BTreeMap { Api::Traces => ("traces", "1.0"), Api::Rpc => ("rpc", "1.0"), Api::SecretStore => ("secretstore", "1.0"), + Api::Whisper => ("shh", "1.0"), + Api::WhisperPubSub => ("shh_pubsub", "1.0"), }; modules.insert(name.into(), version.into()); } @@ -213,6 +223,7 @@ pub struct FullDependencies { pub ws_address: Option<(String, u16)>, pub fetch: FetchClient, pub remote: parity_reactor::Remote, + pub whisper_rpc: Option<::whisper::RpcFactory>, } impl FullDependencies { @@ -335,6 +346,18 @@ impl FullDependencies { Api::SecretStore => { handler.extend_with(SecretStoreClient::new(&self.secret_store).to_delegate()); }, + Api::Whisper => { + if let Some(ref whisper_rpc) = self.whisper_rpc { + let whisper = whisper_rpc.make_handler(); + handler.extend_with(::parity_whisper::rpc::Whisper::to_delegate(whisper)); + } + } + Api::WhisperPubSub => { + if let Some(ref whisper_rpc) = self.whisper_rpc { + let whisper = whisper_rpc.make_handler(); + handler.extend_with(::parity_whisper::rpc::WhisperPubSub::to_delegate(whisper)); + } + } } } } @@ -383,6 +406,7 @@ pub struct LightDependencies { pub fetch: FetchClient, pub geth_compatibility: bool, pub remote: parity_reactor::Remote, + pub whisper_rpc: Option<::whisper::RpcFactory>, } impl LightDependencies { @@ -516,6 +540,18 @@ impl LightDependencies { let secret_store = Some(self.secret_store.clone()); handler.extend_with(SecretStoreClient::new(&secret_store).to_delegate()); }, + Api::Whisper => { + if let Some(ref whisper_rpc) = self.whisper_rpc { + let whisper = whisper_rpc.make_handler(); + handler.extend_with(::parity_whisper::rpc::Whisper::to_delegate(whisper)); + } + } + Api::WhisperPubSub => { + if let Some(ref whisper_rpc) = self.whisper_rpc { + let whisper = whisper_rpc.make_handler(); + handler.extend_with(::parity_whisper::rpc::WhisperPubSub::to_delegate(whisper)); + } + } } } } @@ -543,8 +579,17 @@ impl ApiSet { pub fn list_apis(&self) -> HashSet { let mut public_list = [ - Api::Web3, Api::Net, Api::Eth, Api::EthPubSub, Api::Parity, Api::Rpc, Api::SecretStore, + Api::Web3, + Api::Net, + Api::Eth, + Api::EthPubSub, + Api::Parity, + Api::Rpc, + Api::SecretStore, + Api::Whisper, + Api::WhisperPubSub, ].into_iter().cloned().collect(); + match *self { ApiSet::List(ref apis) => apis.clone(), ApiSet::PublicContext => public_list, @@ -605,6 +650,8 @@ mod test { assert_eq!(Api::Traces, "traces".parse().unwrap()); assert_eq!(Api::Rpc, "rpc".parse().unwrap()); assert_eq!(Api::SecretStore, "secretstore".parse().unwrap()); + assert_eq!(Api::Whisper, "shh".parse().unwrap()); + assert_eq!(Api::WhisperPubSub, "shh_pubsub".parse().unwrap()); assert!("rp".parse::().is_err()); } @@ -622,7 +669,7 @@ mod test { fn test_api_set_unsafe_context() { let expected = vec![ // make sure this list contains only SAFE methods - Api::Web3, Api::Net, Api::Eth, Api::EthPubSub, Api::Parity, Api::ParityPubSub, Api::Traces, Api::Rpc, Api::SecretStore + Api::Web3, Api::Net, Api::Eth, Api::EthPubSub, Api::Parity, Api::ParityPubSub, Api::Traces, Api::Rpc, Api::SecretStore, Api::Whisper, Api::WhisperPubSub, ].into_iter().collect(); assert_eq!(ApiSet::UnsafeContext.list_apis(), expected); } @@ -631,7 +678,7 @@ mod test { fn test_api_set_ipc_context() { let expected = vec![ // safe - Api::Web3, Api::Net, Api::Eth, Api::EthPubSub, Api::Parity, Api::ParityPubSub, Api::Traces, Api::Rpc, Api::SecretStore, + Api::Web3, Api::Net, Api::Eth, Api::EthPubSub, Api::Parity, Api::ParityPubSub, Api::Traces, Api::Rpc, Api::SecretStore, Api::Whisper, Api::WhisperPubSub, // semi-safe Api::ParityAccounts ].into_iter().collect(); @@ -642,7 +689,7 @@ mod test { fn test_api_set_safe_context() { let expected = vec![ // safe - Api::Web3, Api::Net, Api::Eth, Api::EthPubSub, Api::Parity, Api::ParityPubSub, Api::Traces, Api::Rpc, Api::SecretStore, + Api::Web3, Api::Net, Api::Eth, Api::EthPubSub, Api::Parity, Api::ParityPubSub, Api::Traces, Api::Rpc, Api::SecretStore, Api::Whisper, Api::WhisperPubSub, // semi-safe Api::ParityAccounts, // Unsafe @@ -654,7 +701,7 @@ mod test { #[test] fn test_all_apis() { assert_eq!("all".parse::().unwrap(), ApiSet::List(vec![ - Api::Web3, Api::Net, Api::Eth, Api::EthPubSub, Api::Parity, Api::ParityPubSub, Api::Traces, Api::Rpc, Api::SecretStore, + Api::Web3, Api::Net, Api::Eth, Api::EthPubSub, Api::Parity, Api::ParityPubSub, Api::Traces, Api::Rpc, Api::SecretStore, Api::Whisper, Api::WhisperPubSub, Api::ParityAccounts, Api::ParitySet, Api::Signer, Api::Personal @@ -664,7 +711,7 @@ mod test { #[test] fn test_all_without_personal_apis() { assert_eq!("personal,all,-personal".parse::().unwrap(), ApiSet::List(vec![ - Api::Web3, Api::Net, Api::Eth, Api::EthPubSub, Api::Parity, Api::ParityPubSub, Api::Traces, Api::Rpc, Api::SecretStore, + Api::Web3, Api::Net, Api::Eth, Api::EthPubSub, Api::Parity, Api::ParityPubSub, Api::Traces, Api::Rpc, Api::SecretStore, Api::Whisper, Api::WhisperPubSub, Api::ParityAccounts, Api::ParitySet, Api::Signer, ].into_iter().collect())); @@ -673,7 +720,7 @@ mod test { #[test] fn test_safe_parsing() { assert_eq!("safe".parse::().unwrap(), ApiSet::List(vec![ - Api::Web3, Api::Net, Api::Eth, Api::EthPubSub, Api::Parity, Api::ParityPubSub, Api::Traces, Api::Rpc, Api::SecretStore, + Api::Web3, Api::Net, Api::Eth, Api::EthPubSub, Api::Parity, Api::ParityPubSub, Api::Traces, Api::Rpc, Api::SecretStore, Api::Whisper, Api::WhisperPubSub, ].into_iter().collect())); } } diff --git a/parity/run.rs b/parity/run.rs index 30f4c8759..4e7a16376 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -115,6 +115,7 @@ pub struct RunCmd { pub serve_light: bool, pub light: bool, pub no_persistent_txqueue: bool, + pub whisper: ::whisper::Config } pub fn open_ui(ws_conf: &rpc::WsConfiguration, ui_conf: &rpc::UiConfiguration) -> Result<(), String> { @@ -230,6 +231,17 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc) -> // start on_demand service. let on_demand = Arc::new(::light::on_demand::OnDemand::new(cache.clone())); + let mut attached_protos = Vec::new(); + let whisper_factory = if cmd.whisper.enabled { + let (whisper_net, whisper_factory) = ::whisper::setup(cmd.whisper.target_message_pool_size) + .map_err(|e| format!("Failed to initialize whisper: {}", e))?; + + attached_protos.push(whisper_net); + whisper_factory + } else { + None + }; + // set network path. net_conf.net_config_path = Some(db_dirs.network_path().to_string_lossy().into_owned()); let sync_params = LightSyncParams { @@ -238,6 +250,7 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc) -> network_id: cmd.network_id.unwrap_or(spec.network_id()), subprotocol_name: ethsync::LIGHT_PROTOCOL, handlers: vec![on_demand.clone()], + attached_protos: attached_protos, }; let light_sync = LightSync::new(sync_params).map_err(|e| format!("Error starting network: {}", e))?; let light_sync = Arc::new(light_sync); @@ -318,6 +331,7 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc) -> fetch: fetch, geth_compatibility: cmd.geth_compatibility, remote: event_loop.remote(), + whisper_rpc: whisper_factory, }); let dependencies = rpc::Dependencies { @@ -589,6 +603,18 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc) -> R .map_err(|e| format!("Stratum start error: {:?}", e))?; } + let mut attached_protos = Vec::new(); + + let whisper_factory = if cmd.whisper.enabled { + let (whisper_net, whisper_factory) = ::whisper::setup(cmd.whisper.target_message_pool_size) + .map_err(|e| format!("Failed to initialize whisper: {}", e))?; + + attached_protos.push(whisper_net); + whisper_factory + } else { + None + }; + // create sync object let (sync_provider, manage_network, chain_notify) = modules::sync( &mut hypervisor, @@ -598,6 +624,7 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc) -> R snapshot_service.clone(), client.clone(), &cmd.logger_config, + attached_protos, ).map_err(|e| format!("Sync error: {}", e))?; service.add_notify(chain_notify.clone()); @@ -681,6 +708,7 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc) -> R ws_address: cmd.ws_conf.address(), fetch: fetch.clone(), remote: event_loop.remote(), + whisper_rpc: whisper_factory, }); let dependencies = rpc::Dependencies { diff --git a/parity/sync.rs b/parity/sync.rs index 3013c29de..46bfdc72d 100644 --- a/parity/sync.rs +++ b/parity/sync.rs @@ -52,11 +52,12 @@ pub fn main() { let remote_provider = dependency!(LightProviderClient, &service_urls::with_base(&service_config.io_path, service_urls::LIGHT_PROVIDER)); let sync = EthSync::new(Params { - config: service_config.sync, - chain: remote_client.service().clone(), - snapshot_service: remote_snapshot.service().clone(), + config: service_config.sync, + chain: remote_client.service().clone(), + snapshot_service: remote_snapshot.service().clone(), provider: remote_provider.service().clone(), network_config: service_config.net + attached_protos: Vec::new(), }).unwrap(); let _ = boot::main_thread(); diff --git a/parity/whisper.rs b/parity/whisper.rs new file mode 100644 index 000000000..cab117ae6 --- /dev/null +++ b/parity/whisper.rs @@ -0,0 +1,77 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use std::sync::Arc; +use std::io; + +use ethsync::AttachedProtocol; +use parity_rpc::Metadata; +use parity_whisper::net::{self as whisper_net, PoolHandle, Network as WhisperNetwork}; +use parity_whisper::rpc::{WhisperClient, FilterManager}; + +/// Whisper config. +#[derive(Debug, PartialEq, Eq)] +pub struct Config { + pub enabled: bool, + pub target_message_pool_size: usize, +} + +impl Default for Config { + fn default() -> Self { + Config { + enabled: false, + target_message_pool_size: 10 * 1024 * 1024, + } + } +} + +/// Factory for standard whisper RPC. +pub struct RpcFactory { + net: Arc>>, + manager: Arc, +} + +impl RpcFactory { + pub fn make_handler(&self) -> WhisperClient { + WhisperClient::new(self.net.handle(), self.manager.clone()) + } +} + +/// Sets up whisper protocol and RPC handler. +/// +/// Will target the given pool size. +#[cfg(not(feature = "ipc"))] +pub fn setup(target_pool_size: usize) -> io::Result<(AttachedProtocol, Option)> { + let manager = Arc::new(FilterManager::new()?); + let net = Arc::new(WhisperNetwork::new(target_pool_size, manager.clone())); + + let proto = AttachedProtocol { + handler: net.clone() as Arc<_>, + packet_count: whisper_net::PACKET_COUNT, + versions: whisper_net::SUPPORTED_VERSIONS, + protocol_id: *b"shh", + }; + + let factory = RpcFactory { net: net, manager: manager }; + + Ok((proto, Some(factory))) +} + +// TODO: make it possible to attach generic protocols in IPC. +#[cfg(feature = "ipc")] +pub fn setup(_pool: usize) -> (AttachedProtocol, Option) { + Ok((AttachedProtocol, None)) +} diff --git a/sync/src/api.rs b/sync/src/api.rs index 09511ca1f..b8f495f9b 100644 --- a/sync/src/api.rs +++ b/sync/src/api.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use std::collections::{HashMap, BTreeMap}; use std::io; use util::Bytes; -use network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId, ProtocolId, +use network::{NetworkProtocolHandler, NetworkService, NetworkContext, HostInfo, PeerId, ProtocolId, NetworkConfiguration as BasicNetworkConfiguration, NonReservedPeerMode, NetworkError, AllowIP as NetworkAllowIP}; use util::{U256, H256, H512}; @@ -163,6 +163,44 @@ impl From for PipProtocolInfo { } } +/// Configuration to attach alternate protocol handlers. +/// Only works when IPC is disabled. +#[cfg(not(feature = "ipc"))] +pub struct AttachedProtocol { + /// The protocol handler in question. + pub handler: Arc, + /// 3-character ID for the protocol. + pub protocol_id: ProtocolId, + /// Packet count. + pub packet_count: u8, + /// Supported versions. + pub versions: &'static [u8], +} + +/// Attached protocol: disabled in IPC mode. +#[cfg(feature = "ipc")] +#[cfg_attr(feature = "ipc", derive(Binary))] +pub struct AttachedProtocol; + +impl AttachedProtocol { + #[cfg(feature = "ipc")] + fn register(&self, network: &NetworkService) { + let res = network.register_protocol( + self.handler.clone(), + self.protocol_id, + self.packet_count, + self.versions + ); + + if let Err(e) = res { + warn!(target: "sync","Error attaching protocol {:?}", protocol_id); + } + } + + #[cfg(not(feature = "ipc"))] + fn register(&self, _network: &NetworkService) {} +} + /// EthSync initialization parameters. #[cfg_attr(feature = "ipc", derive(Binary))] pub struct Params { @@ -176,6 +214,8 @@ pub struct Params { pub provider: Arc<::light::Provider>, /// Network layer configuration. pub network_config: NetworkConfiguration, + /// Other protocols to attach. + pub attached_protos: Vec, } /// Ethereum network protocol handler @@ -186,6 +226,8 @@ pub struct EthSync { eth_handler: Arc, /// Light (pip) protocol handler light_proto: Option>, + /// Other protocols to attach. + attached_protos: Vec, /// The main subprotocol name subprotocol_name: [u8; 3], /// Light subprotocol name. @@ -243,6 +285,7 @@ impl EthSync { light_proto: light_proto, subprotocol_name: params.config.subprotocol_name, light_subprotocol_name: params.config.light_subprotocol_name, + attached_protos: params.attached_protos, }); Ok(sync) @@ -307,7 +350,7 @@ struct SyncProtocolHandler { } impl NetworkProtocolHandler for SyncProtocolHandler { - fn initialize(&self, io: &NetworkContext) { + fn initialize(&self, io: &NetworkContext, _host_info: &HostInfo) { if io.subprotocol_name() != WARP_SYNC_PROTOCOL_ID { io.register_timer(0, 1000).expect("Error registering sync timer"); } @@ -400,6 +443,9 @@ impl ChainNotify for EthSync { self.network.register_protocol(light_proto, self.light_subprotocol_name, ::light::net::PACKET_COUNT, ::light::net::PROTOCOL_VERSIONS) .unwrap_or_else(|e| warn!("Error registering light client protocol: {:?}", e)); } + + // register any attached protocols. + for proto in &self.attached_protos { proto.register(&self.network) } } fn stop(&self) { @@ -676,12 +722,15 @@ pub struct LightSyncParams { pub subprotocol_name: [u8; 3], /// Other handlers to attach. pub handlers: Vec>, + /// Other subprotocols to run. + pub attached_protos: Vec, } /// Service for light synchronization. pub struct LightSync { proto: Arc, sync: Arc<::light_sync::SyncInfo + Sync + Send>, + attached_protos: Vec, network: NetworkService, subprotocol_name: [u8; 3], network_id: u64, @@ -724,6 +773,7 @@ impl LightSync { Ok(LightSync { proto: light_proto, sync: sync, + attached_protos: params.attached_protos, network: service, subprotocol_name: params.subprotocol_name, network_id: params.network_id, @@ -775,6 +825,8 @@ impl ManageNetwork for LightSync { self.network.register_protocol(light_proto, self.subprotocol_name, ::light::net::PACKET_COUNT, ::light::net::PROTOCOL_VERSIONS) .unwrap_or_else(|e| warn!("Error registering light client protocol: {:?}", e)); + + for proto in &self.attached_protos { proto.register(&self.network) } } fn stop_network(&self) { diff --git a/util/bigint/src/hash.rs b/util/bigint/src/hash.rs index 748236285..ca6adf0e7 100644 --- a/util/bigint/src/hash.rs +++ b/util/bigint/src/hash.rs @@ -13,7 +13,7 @@ use std::cmp::{min, Ordering}; use std::ops::{Deref, DerefMut, BitXor, BitAnd, BitOr, IndexMut, Index}; use std::hash::{Hash, Hasher, BuildHasherDefault}; use std::collections::{HashMap, HashSet}; -use rand::Rng; +use rand::{Rand, Rng}; use rand::os::OsRng; use rustc_hex::{FromHex, FromHexError}; use bigint::U256; @@ -91,7 +91,7 @@ macro_rules! impl_hash { /// Assign self have a cryptographically random value. pub fn randomize(&mut self) { let mut rng = OsRng::new().unwrap(); - rng.fill_bytes(&mut self.0); + *self= $from::rand(&mut rng); } /// Get the size of this object in bytes. @@ -360,6 +360,14 @@ macro_rules! impl_hash { $from::from_slice(s) } } + + impl Rand for $from { + fn rand(r: &mut R) -> Self { + let mut hash = $from::new(); + r.fill_bytes(&mut hash.0); + hash + } + } } } diff --git a/util/network/src/host.rs b/util/network/src/host.rs index ec211805e..50c721fb6 100644 --- a/util/network/src/host.rs +++ b/util/network/src/host.rs @@ -1099,7 +1099,10 @@ impl IoHandler for Host { } => { let h = handler.clone(); let reserved = self.reserved_nodes.read(); - h.initialize(&NetworkContext::new(io, *protocol, None, self.sessions.clone(), &reserved)); + h.initialize( + &NetworkContext::new(io, *protocol, None, self.sessions.clone(), &reserved), + &*self.info.read(), + ); self.handlers.write().insert(*protocol, h); let mut info = self.info.write(); for v in versions { diff --git a/util/network/src/lib.rs b/util/network/src/lib.rs index 80654d503..5a3fdc1e6 100644 --- a/util/network/src/lib.rs +++ b/util/network/src/lib.rs @@ -26,7 +26,7 @@ //! struct MyHandler; //! //! impl NetworkProtocolHandler for MyHandler { -//! fn initialize(&self, io: &NetworkContext) { +//! fn initialize(&self, io: &NetworkContext, _host_info: &HostInfo) { //! io.register_timer(0, 1000); //! } //! @@ -98,13 +98,13 @@ mod ip_utils; #[cfg(test)] mod tests; -pub use host::{PeerId, PacketId, ProtocolId, NetworkContext, NetworkIoMessage, NetworkConfiguration}; +pub use host::{HostInfo, PeerId, PacketId, ProtocolId, NetworkContext, NetworkIoMessage, NetworkConfiguration}; pub use service::NetworkService; pub use error::NetworkError; pub use stats::NetworkStats; pub use session::SessionInfo; -use io::TimerToken; +pub use io::TimerToken; pub use node_table::{is_valid_node_url, NodeId}; const PROTOCOL_VERSION: u32 = 4; @@ -114,7 +114,7 @@ const PROTOCOL_VERSION: u32 = 4; /// `Message` is the type for message data. pub trait NetworkProtocolHandler: Sync + Send { /// Initialize the handler - fn initialize(&self, _io: &NetworkContext) {} + fn initialize(&self, _io: &NetworkContext, _host_info: &HostInfo) {} /// Called when new network packet received. fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]); /// Called when new peer is connected. Only called when peer supports the same protocol. diff --git a/util/network/src/tests.rs b/util/network/src/tests.rs index 692dd94a0..52184061c 100644 --- a/util/network/src/tests.rs +++ b/util/network/src/tests.rs @@ -59,7 +59,7 @@ impl TestProtocol { } impl NetworkProtocolHandler for TestProtocol { - fn initialize(&self, io: &NetworkContext) { + fn initialize(&self, io: &NetworkContext, _host_info: &HostInfo) { io.register_timer(0, 10).unwrap(); } diff --git a/whisper/Cargo.toml b/whisper/Cargo.toml new file mode 100644 index 000000000..7b74c2990 --- /dev/null +++ b/whisper/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "parity-whisper" +version = "0.1.0" +authors = ["Parity Technologies "] +description = "Whisper Protocol implementation for Parity" + +[dependencies] +bitflags = "0.9" +byteorder = "1.0.0" +ethcore-bigint = { path = "../util/bigint" } +ethcore-network = { path = "../util/network" } +ethcrypto = { path = "../ethcrypto" } +ethkey = { path = "../ethkey" } +futures = "0.1" +hex = "0.2" +log = "0.3" +ordered-float = "0.5" +parking_lot = "0.4" +rand = "0.3" +ring = "0.9.5" +rlp = { path = "../util/rlp" } +serde = "1.0" +serde_derive = "1.0" +serde_json = "1.0" +slab = "0.3" +smallvec = "0.4" +time = "0.1" +tiny-keccak = "1.2.1" + +jsonrpc-core = { git = "https://github.com/paritytech/jsonrpc.git", branch = "parity-1.7" } +jsonrpc-macros = { git = "https://github.com/paritytech/jsonrpc.git", branch = "parity-1.7" } +jsonrpc-pubsub = { git = "https://github.com/paritytech/jsonrpc.git", branch = "parity-1.7" } diff --git a/whisper/README.md b/whisper/README.md new file mode 100644 index 000000000..34a68a1cb --- /dev/null +++ b/whisper/README.md @@ -0,0 +1,3 @@ +# Whisper + +Implementation of Whisper based on the Whisper-v2 PoC. diff --git a/whisper/src/lib.rs b/whisper/src/lib.rs new file mode 100644 index 000000000..e865637ed --- /dev/null +++ b/whisper/src/lib.rs @@ -0,0 +1,59 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Whisper P2P messaging system as a DevP2P subprotocol, with RPC and Rust +//! interface. + +extern crate byteorder; +extern crate ethcore_bigint as bigint; +extern crate ethcore_network as network; +extern crate ethcrypto; +extern crate ethkey; +extern crate futures; +extern crate hex; +extern crate ordered_float; +extern crate parking_lot; +extern crate rand; +extern crate rlp; +extern crate ring; +extern crate serde; +extern crate serde_json; +extern crate slab; +extern crate smallvec; +extern crate time; +extern crate tiny_keccak; + +extern crate jsonrpc_core; +extern crate jsonrpc_pubsub; + +#[macro_use] +extern crate bitflags; + +#[macro_use] +extern crate log; + +#[macro_use] +extern crate jsonrpc_macros; + +#[macro_use] +extern crate serde_derive; + +pub use self::message::Message; +pub use self::net::{Network, MessageHandler}; + +pub mod message; +pub mod net; +pub mod rpc; diff --git a/whisper/src/message.rs b/whisper/src/message.rs new file mode 100644 index 000000000..e5202c583 --- /dev/null +++ b/whisper/src/message.rs @@ -0,0 +1,479 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Whisper message parsing, handlers, and construction. + +use std::fmt; +use std::time::{self, SystemTime, Duration}; + +use bigint::hash::{H256, H512}; +use rlp::{self, DecoderError, RlpStream, UntrustedRlp}; +use smallvec::SmallVec; +use tiny_keccak::{keccak256, Keccak}; + +/// Work-factor proved. Takes 3 parameters: size of message, time to live, +/// and hash. +/// +/// Panics if size or TTL is zero. +pub fn work_factor_proved(size: u64, ttl: u64, hash: H256) -> f64 { + assert!(size != 0 && ttl != 0); + + let leading_zeros = { + let leading_zeros = hash.iter().take_while(|&&x| x == 0).count(); + (leading_zeros * 8) + hash.get(leading_zeros + 1).map_or(0, |b| b.leading_zeros() as usize) + }; + let spacetime = size as f64 * ttl as f64; + + (1u64 << leading_zeros) as f64 / spacetime +} + +/// A topic of a message. +#[derive(Debug, Default, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub struct Topic(pub [u8; 4]); + +impl From<[u8; 4]> for Topic { + fn from(x: [u8; 4]) -> Self { + Topic(x) + } +} + +impl Topic { + /// set up to three bits in the 64-byte bloom passed. + /// + /// this takes 3 sets of 9 bits, treating each as an index in the range + /// 0..512 into the bloom and setting the corresponding bit in the bloom to 1. + pub fn bloom_into(&self, bloom: &mut H512) { + let mut set_bit = |idx: usize| { + let idx = idx & 511; + bloom[idx / 8] |= 1 << idx % 8; + }; + + let data = &self.0; + let mut combined = ((data[0] as usize) << 24) | + ((data[1] as usize) << 16) | + ((data[2] as usize) << 8) | + data[3] as usize; + + // take off the last 5 bits as we only use 27. + combined >>= 5; + + set_bit(combined); + set_bit(combined >> 9); + set_bit(combined >> 18); + } + + /// Get bloom for single topic. + pub fn bloom(&self) -> H512 { + let mut bloom = Default::default(); + self.bloom_into(&mut bloom); + bloom + } +} + +impl rlp::Encodable for Topic { + fn rlp_append(&self, s: &mut RlpStream) { + s.encoder().encode_value(&self.0); + } +} + +impl rlp::Decodable for Topic { + fn decode(rlp: &UntrustedRlp) -> Result { + use std::cmp; + + rlp.decoder().decode_value(|bytes| match bytes.len().cmp(&4) { + cmp::Ordering::Less => Err(DecoderError::RlpIsTooShort), + cmp::Ordering::Greater => Err(DecoderError::RlpIsTooBig), + cmp::Ordering::Equal => { + let mut t = [0u8; 4]; + t.copy_from_slice(bytes); + Ok(Topic(t)) + } + }) + } +} + +/// Calculate union of blooms for given topics. +pub fn bloom_topics(topics: &[Topic]) -> H512 { + let mut bloom = H512::default(); + for topic in topics { + topic.bloom_into(&mut bloom); + } + bloom +} + +/// Message errors. +#[derive(Debug)] +pub enum Error { + Decoder(DecoderError), + LivesTooLong, + IssuedInFuture, + ZeroTTL, +} + +impl From for Error { + fn from(err: DecoderError) -> Self { + Error::Decoder(err) + } +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + Error::Decoder(ref err) => write!(f, "Failed to decode message: {}", err), + Error::LivesTooLong => write!(f, "Message claims to be issued before the unix epoch."), + Error::IssuedInFuture => write!(f, "Message issued in future."), + Error::ZeroTTL => write!(f, "Message live for zero time."), + } + } +} + +// Raw envelope struct. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct Envelope { + /// Expiry timestamp + pub expiry: u64, + /// Time-to-live in seconds + pub ttl: u64, + /// series of 4-byte topics. + pub topics: SmallVec<[Topic; 4]>, + /// The message contained within. + pub data: Vec, + /// Arbitrary value used to target lower PoW hash. + pub nonce: u64, +} + +impl Envelope { + fn proving_hash(&self) -> H256 { + use byteorder::{BigEndian, ByteOrder}; + + let mut buf = [0; 32]; + + let mut stream = RlpStream::new_list(4); + stream.append(&self.expiry) + .append(&self.ttl) + .append_list(&self.topics) + .append(&self.data); + + let mut digest = Keccak::new_keccak256(); + digest.update(&*stream.drain()); + digest.update(&{ + let mut nonce_bytes = [0u8; 8]; + BigEndian::write_u64(&mut nonce_bytes, self.nonce); + + nonce_bytes + }); + + digest.finalize(&mut buf); + H256(buf) + } +} + +impl rlp::Encodable for Envelope { + fn rlp_append(&self, s: &mut RlpStream) { + s.begin_list(5) + .append(&self.expiry) + .append(&self.ttl) + .append_list(&self.topics) + .append(&self.data) + .append(&self.nonce); + } +} + +impl rlp::Decodable for Envelope { + fn decode(rlp: &UntrustedRlp) -> Result { + if rlp.item_count()? != 5 { return Err(DecoderError::RlpIncorrectListLen) } + + Ok(Envelope { + expiry: rlp.val_at(0)?, + ttl: rlp.val_at(1)?, + topics: rlp.at(2)?.iter().map(|x| x.as_val()).collect::>()?, + data: rlp.val_at(3)?, + nonce: rlp.val_at(4)?, + }) + } +} + +/// Message creation parameters. +/// Pass this to `Message::create` to make a message. +pub struct CreateParams { + /// time-to-live in seconds. + pub ttl: u64, + /// payload data. + pub payload: Vec, + /// Topics. + pub topics: Vec, + /// How many milliseconds to spend proving work. + pub work: u64, +} + +/// A whisper message. This is a checked message carrying around metadata. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Message { + envelope: Envelope, + bloom: H512, + hash: H256, + encoded_size: usize, +} + +impl Message { + /// Create a message from creation parameters. + /// Panics if TTL is 0. + pub fn create(params: CreateParams) -> Self { + use byteorder::{BigEndian, ByteOrder}; + use rand::{Rng, SeedableRng, XorShiftRng}; + + let mut rng = { + let mut thread_rng = ::rand::thread_rng(); + + XorShiftRng::from_seed(thread_rng.gen::<[u32; 4]>()) + }; + + assert!(params.ttl > 0); + + let expiry = { + let after_mining = SystemTime::now() + Duration::from_millis(params.work); + let since_epoch = after_mining.duration_since(time::UNIX_EPOCH) + .expect("time after now is after unix epoch; qed"); + + // round up the sub-second to next whole second. + since_epoch.as_secs() + if since_epoch.subsec_nanos() == 0 { 0 } else { 1 } + }; + + let start_digest = { + let mut stream = RlpStream::new_list(4); + stream.append(&expiry) + .append(¶ms.ttl) + .append_list(¶ms.topics) + .append(¶ms.payload); + + let mut digest = Keccak::new_keccak256(); + digest.update(&*stream.drain()); + digest + }; + + let mut buf = [0; 32]; + let mut try_nonce = move |nonce: &[u8; 8]| { + let mut digest = start_digest.clone(); + digest.update(&nonce[..]); + digest.finalize(&mut buf[..]); + + buf.clone() + }; + + let mut nonce: [u8; 8] = rng.gen(); + let mut best_found = try_nonce(&nonce); + + let start = ::time::precise_time_ns(); + + while ::time::precise_time_ns() <= start + params.work * 1_000_000 { + let temp_nonce = rng.gen(); + let hash = try_nonce(&temp_nonce); + + if hash < best_found { + nonce = temp_nonce; + best_found = hash; + } + } + + let envelope = Envelope { + expiry: expiry, + ttl: params.ttl, + topics: params.topics.into_iter().collect(), + data: params.payload, + nonce: BigEndian::read_u64(&nonce[..]), + }; + + debug_assert_eq!(H256(best_found.clone()), envelope.proving_hash()); + + let encoded = ::rlp::encode(&envelope); + + Message::from_components( + envelope, + encoded.len(), + H256(keccak256(&encoded)), + SystemTime::now(), + ).expect("Message generated here known to be valid; qed") + } + + /// Decode message from RLP and check for validity against system time. + pub fn decode(rlp: UntrustedRlp, now: SystemTime) -> Result { + let envelope: Envelope = rlp.as_val()?; + let encoded_size = rlp.as_raw().len(); + let hash = H256(keccak256(rlp.as_raw())); + + Message::from_components(envelope, encoded_size, hash, now) + } + + // create message from envelope, hash, and encoded size. + // does checks for validity. + fn from_components(envelope: Envelope, size: usize, hash: H256, now: SystemTime) + -> Result + { + const LEEWAY_SECONDS: u64 = 2; + + if envelope.expiry <= envelope.ttl { return Err(Error::LivesTooLong) } + if envelope.ttl == 0 { return Err(Error::ZeroTTL) } + + let issue_time_adjusted = Duration::from_secs( + (envelope.expiry - envelope.ttl).saturating_sub(LEEWAY_SECONDS) + ); + + if time::UNIX_EPOCH + issue_time_adjusted > now { + return Err(Error::IssuedInFuture); + } + + // other validity checks? + let bloom = bloom_topics(&envelope.topics); + + Ok(Message { + envelope: envelope, + bloom: bloom, + hash: hash, + encoded_size: size, + }) + } + + /// Get a reference to the envelope. + pub fn envelope(&self) -> &Envelope { + &self.envelope + } + + /// Get the encoded size of the envelope. + pub fn encoded_size(&self) -> usize { + self.encoded_size + } + + /// Get a uniquely identifying hash for the message. + pub fn hash(&self) -> &H256 { + &self.hash + } + + /// Get the bloom filter of the topics + pub fn bloom(&self) -> &H512 { + &self.bloom + } + + /// Get the work proved by the hash. + pub fn work_proved(&self) -> f64 { + let proving_hash = self.envelope.proving_hash(); + + work_factor_proved(self.encoded_size as _, self.envelope.ttl, proving_hash) + } + + /// Get the expiry time. + pub fn expiry(&self) -> SystemTime { + time::UNIX_EPOCH + Duration::from_secs(self.envelope.expiry) + } + + /// Get the topics. + pub fn topics(&self) -> &[Topic] { + &self.envelope.topics + } + + /// Get the message data. + pub fn data(&self) -> &[u8] { + &self.envelope.data + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::{self, Duration, SystemTime}; + use rlp::UntrustedRlp; + + fn unix_time(x: u64) -> SystemTime { + time::UNIX_EPOCH + Duration::from_secs(x) + } + + #[test] + fn create_message() { + let _ = Message::create(CreateParams { + ttl: 100, + payload: vec![1, 2, 3, 4], + topics: Vec::new(), + work: 50, + }); + } + + #[test] + fn round_trip() { + let envelope = Envelope { + expiry: 100_000, + ttl: 30, + data: vec![9; 256], + topics: Default::default(), + nonce: 1010101, + }; + + let encoded = ::rlp::encode(&envelope); + let decoded = ::rlp::decode(&encoded); + + assert_eq!(envelope, decoded) + } + + #[test] + fn passes_checks() { + let envelope = Envelope { + expiry: 100_000, + ttl: 30, + data: vec![9; 256], + topics: Default::default(), + nonce: 1010101, + }; + + let encoded = ::rlp::encode(&envelope); + + for i in 0..30 { + let now = unix_time(100_000 - i); + Message::decode(UntrustedRlp::new(&*encoded), now).unwrap(); + } + } + + #[test] + #[should_panic] + fn future_message() { + let envelope = Envelope { + expiry: 100_000, + ttl: 30, + data: vec![9; 256], + topics: Default::default(), + nonce: 1010101, + }; + + let encoded = ::rlp::encode(&envelope); + + let now = unix_time(100_000 - 1_000); + Message::decode(UntrustedRlp::new(&*encoded), now).unwrap(); + } + + #[test] + #[should_panic] + fn pre_epoch() { + let envelope = Envelope { + expiry: 100_000, + ttl: 200_000, + data: vec![9; 256], + topics: Default::default(), + nonce: 1010101, + }; + + let encoded = ::rlp::encode(&envelope); + + let now = unix_time(95_000); + Message::decode(UntrustedRlp::new(&*encoded), now).unwrap(); + } +} diff --git a/whisper/src/net.rs b/whisper/src/net.rs new file mode 100644 index 000000000..5553f27e6 --- /dev/null +++ b/whisper/src/net.rs @@ -0,0 +1,638 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Whisper messaging system as a DevP2P subprotocol. + +use std::collections::{HashMap, HashSet}; +use std::cmp::Ordering; +use std::fmt; +use std::time::{Duration, SystemTime}; +use std::sync::Arc; + +use bigint::hash::{H256, H512}; +use network::{HostInfo, NetworkContext, NetworkError, NodeId, PeerId, TimerToken}; +use ordered_float::OrderedFloat; +use parking_lot::{Mutex, RwLock}; +use rlp::{DecoderError, RlpStream, UntrustedRlp}; + +use message::{Message, Error as MessageError}; + +const RALLY_TOKEN: TimerToken = 1; +const RALLY_TIMEOUT_MS: u64 = 750; // supposed to be at least once per second. + +const PROTOCOL_VERSION: usize = 2; + +/// Supported protocol versions. +pub const SUPPORTED_VERSIONS: &'static [u8] = &[PROTOCOL_VERSION as u8]; + +// maximum tolerated delay between messages packets. +const MAX_TOLERATED_DELAY_MS: u64 = 2000; + +/// Number of packets. +pub const PACKET_COUNT: u8 = 3; + +mod packet { + pub const STATUS: u8 = 0; + pub const MESSAGES: u8 = 1; + pub const TOPIC_FILTER: u8 = 2; +} + +/// Handles messages within a single packet. +pub trait MessageHandler: Send + Sync { + /// Evaluate the message and handle it. + /// + /// The same message will not be passed twice. + /// Heavy handling should be done asynchronously. + /// If there is a significant overhead in this thread, then an attacker + /// can determine which kinds of messages we are listening for. + fn handle_messages(&self, message: &[Message]); +} + +// errors in importing a whisper message. +#[derive(Debug)] +enum Error { + Decoder(DecoderError), + Network(NetworkError), + Message(MessageError), + UnknownPacket(u8), + UnknownPeer(PeerId), + ProtocolVersionMismatch(usize), + SameNodeKey, + UnexpectedMessage, +} + +impl From for Error { + fn from(err: DecoderError) -> Self { + Error::Decoder(err) + } +} + +impl From for Error { + fn from(err: NetworkError) -> Self { + Error::Network(err) + } +} + +impl From for Error { + fn from(err: MessageError) -> Self { + Error::Message(err) + } +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + Error::Decoder(ref err) => write!(f, "Failed to decode packet: {}", err), + Error::Network(ref err) => write!(f, "Network error: {}", err), + Error::Message(ref err) => write!(f, "Error decoding message: {}", err), + Error::UnknownPacket(ref id) => write!(f, "Unknown packet kind: {}", id), + Error::UnknownPeer(ref id) => write!(f, "Message received from unknown peer: {}", id), + Error::ProtocolVersionMismatch(ref proto) => + write!(f, "Unknown protocol version: {}", proto), + Error::UnexpectedMessage => write!(f, "Unexpected message."), + Error::SameNodeKey => write!(f, "Peer and us have same node key."), + } + } +} + +// sorts by work proved, descending. +#[derive(PartialEq, Eq)] +struct SortedEntry { + slab_id: usize, + work_proved: OrderedFloat, + expiry: SystemTime, +} + +impl Ord for SortedEntry { + fn cmp(&self, other: &SortedEntry) -> Ordering { + self.work_proved.cmp(&other.work_proved) + } +} + +impl PartialOrd for SortedEntry { + fn partial_cmp(&self, other: &SortedEntry) -> Option { + Some(self.cmp(other)) + } +} + +// stores messages by two metrics: expiry and PoW rating +// when full, will accept messages above the minimum stored. +struct Messages { + slab: ::slab::Slab, + sorted: Vec, + known: HashSet, + removed_hashes: Vec, + cumulative_size: usize, + ideal_size: usize, +} + +impl Messages { + fn new(ideal_size: usize) -> Self { + Messages { + slab: ::slab::Slab::with_capacity(0), + sorted: Vec::new(), + known: HashSet::new(), + removed_hashes: Vec::new(), + cumulative_size: 0, + ideal_size: ideal_size, + } + } + + // reserve space for additional elements. + fn reserve(&mut self, additional: usize) { + self.slab.reserve_exact(additional); + self.sorted.reserve(additional); + self.known.reserve(additional); + } + + // whether a message is not known and within the bounds of PoW. + fn may_accept(&self, message: &Message) -> bool { + !self.known.contains(message.hash()) && { + self.sorted.last().map_or(true, |entry| { + let work_proved = OrderedFloat(message.work_proved()); + OrderedFloat(self.slab[entry.slab_id].work_proved()) < work_proved + }) + } + } + + // insert a message into the store. for best performance, + // call `reserve` before inserting a bunch. + // + fn insert(&mut self, message: Message) -> bool { + if !self.known.insert(message.hash().clone()) { return false } + + let work_proved = OrderedFloat(message.work_proved()); + + // pop off entries by low PoW until we have enough space for the higher + // PoW message being inserted. + let size_upon_insertion = self.cumulative_size + message.encoded_size(); + if size_upon_insertion >= self.ideal_size { + let diff = size_upon_insertion - self.ideal_size; + let mut found_diff = 0; + for entry in self.sorted.iter().rev() { + if found_diff >= diff { break } + + // if we encounter a message with at least the PoW we're looking + // at, don't push that message out. + if entry.work_proved >= work_proved { return false } + found_diff += self.slab[entry.slab_id].encoded_size(); + } + + // message larger than ideal size. + if found_diff < diff { return false } + + while found_diff > 0 { + let entry = self.sorted.pop() + .expect("found_diff built by traversing entries; therefore that many entries exist; qed"); + + let message = self.slab.remove(entry.slab_id) + .expect("sorted entry slab IDs always filled; qed"); + + found_diff -= message.encoded_size(); + + self.cumulative_size -= message.encoded_size(); + self.known.remove(message.hash()); + self.removed_hashes.push(message.hash().clone()); + } + } + + let expiry = message.expiry(); + + self.cumulative_size += message.encoded_size(); + + if !self.slab.has_available() { self.slab.reserve_exact(1) } + let id = self.slab.insert(message).expect("just ensured enough space in slab; qed"); + + let sorted_entry = SortedEntry { + slab_id: id, + work_proved: work_proved, + expiry: expiry, + }; + + match self.sorted.binary_search(&sorted_entry) { + Ok(idx) | Err(idx) => self.sorted.insert(idx, sorted_entry), + } + + true + } + + // prune expired messages, and then prune low proof-of-work messages + // until below ideal size. + fn prune(&mut self, now: SystemTime) -> Vec { + { + let slab = &mut self.slab; + let known = &mut self.known; + let cumulative_size = &mut self.cumulative_size; + let ideal_size = &self.ideal_size; + let removed = &mut self.removed_hashes; + + // first pass, we look just at expired entries. + let all_expired = self.sorted.iter() + .filter(|entry| entry.expiry <= now) + .map(|x| (true, x)); + + // second pass, we look at entries which aren't expired but in order + // by PoW + let low_proof = self.sorted.iter().rev() + .filter(|entry| entry.expiry > now) + .map(|x| (false, x)); + + for (is_expired, entry) in all_expired.chain(low_proof) { + // break once we've removed all expired entries + // or have taken enough low-work entries. + if !is_expired && *cumulative_size <= *ideal_size { + break + } + + let message = slab.remove(entry.slab_id) + .expect("references to ID kept upon creation; only destroyed upon removal; qed"); + + known.remove(message.hash()); + removed.push(message.hash().clone()); + + *cumulative_size -= message.encoded_size(); + } + } + + // clear all the sorted entries we removed from slab. + let slab = &self.slab; + self.sorted.retain(|entry| slab.contains(entry.slab_id)); + + ::std::mem::replace(&mut self.removed_hashes, Vec::new()) + } + + fn iter(&self) -> ::slab::Iter { + self.slab.iter() + } + + fn is_full(&self) -> bool { + self.cumulative_size >= self.ideal_size + } + + fn status(&self) -> PoolStatus { + PoolStatus { + required_pow: if self.is_full() { + self.sorted.last().map(|entry| entry.work_proved.0) + } else { + None + }, + message_count: self.sorted.len(), + cumulative_size: self.cumulative_size, + target_size: self.ideal_size, + } + } +} + +enum State { + Unconfirmed(SystemTime), // awaiting status packet. + TheirTurn(SystemTime), // it has been their turn to send since stored time. + OurTurn, +} + +struct Peer { + node_key: NodeId, + state: State, + known_messages: HashSet, + topic_filter: Option, +} + +impl Peer { + // note that a message has been evicted from the queue. + fn note_evicted(&mut self, messages: &[H256]) { + for message_hash in messages { + self.known_messages.remove(message_hash); + } + } + + // whether this peer will accept the message. + fn will_accept(&self, message: &Message) -> bool { + let known = self.known_messages.contains(message.hash()); + + let matches_bloom = self.topic_filter.as_ref() + .map_or(true, |topic| topic & message.bloom() == message.bloom().clone()); + + !known && matches_bloom + } + + // note a message as known. returns true if it was already + // known, false otherwise. + fn note_known(&mut self, message: &Message) -> bool { + self.known_messages.insert(message.hash().clone()) + } + + fn set_topic_filter(&mut self, topic: H512) { + self.topic_filter = Some(topic); + } + + fn can_send_messages(&self) -> bool { + match self.state { + State::Unconfirmed(_) | State::OurTurn => false, + State::TheirTurn(_) => true, + } + } +} + +/// Pool status. +pub struct PoolStatus { + /// Required PoW to be accepted into the pool + pub required_pow: Option, + /// Number of messages in the pool. + pub message_count: usize, + /// Cumulative size of the messages in the pool + pub cumulative_size: usize, + /// Target size of the pool. + pub target_size: usize, +} + +/// Handle to the pool, for posting messages or getting info. +#[derive(Clone)] +pub struct PoolHandle { + messages: Arc>, +} + +impl PoolHandle { + /// Post a message to the whisper network to be relayed. + pub fn post_message(&self, message: Message) -> bool { + self.messages.write().insert(message) + } + + /// Get number of messages and amount of memory used by them. + pub fn pool_status(&self) -> PoolStatus { + self.messages.read().status() + } +} + +/// The whisper network protocol handler. +pub struct Network { + messages: Arc>, + handler: T, + peers: RwLock>>, + node_key: RwLock, +} + +// public API. +impl Network { + /// Create a new network handler. + pub fn new(messages_size_bytes: usize, handler: T) -> Self { + Network { + messages: Arc::new(RwLock::new(Messages::new(messages_size_bytes))), + handler: handler, + peers: RwLock::new(HashMap::new()), + node_key: RwLock::new(Default::default()), + } + } + + /// Acquire a sender to asynchronously feed messages to the whisper + /// network. + pub fn handle(&self) -> PoolHandle { + PoolHandle { messages: self.messages.clone() } + } +} + +impl Network { + fn rally(&self, io: &NetworkContext) { + // cannot be greater than 16MB (protocol limitation) + const MAX_MESSAGES_PACKET_SIZE: usize = 8 * 1024 * 1024; + + // prune messages. + let now = SystemTime::now(); + let pruned_hashes = self.messages.write().prune(now); + + let messages = self.messages.read(); + let peers = self.peers.read(); + + // send each peer a packet with new messages it may find relevant. + for (peer_id, peer) in peers.iter() { + let mut peer_data = peer.lock(); + peer_data.note_evicted(&pruned_hashes); + + let punish_timeout = |last_activity: &SystemTime| { + if *last_activity + Duration::from_millis(MAX_TOLERATED_DELAY_MS) <= now { + debug!(target: "whisper", "Disconnecting peer {} due to excessive timeout.", peer_id); + io.disconnect_peer(*peer_id); + } + }; + + // check timeouts and skip peers who we can't send a rally to. + match peer_data.state { + State::Unconfirmed(ref time) | State::TheirTurn(ref time) => { + punish_timeout(time); + continue; + } + State::OurTurn => {} + } + + // construct packet, skipping messages the peer won't accept. + let mut stream = RlpStream::new(); + stream.begin_unbounded_list(); + + for message in messages.iter() { + if !peer_data.will_accept(message) { continue } + + if stream.estimate_size(message.encoded_size()) > MAX_MESSAGES_PACKET_SIZE { + break; + } + + peer_data.note_known(message); + stream.append(message.envelope()); + } + + stream.complete_unbounded_list(); + + peer_data.state = State::TheirTurn(SystemTime::now()); + if let Err(e) = io.send(*peer_id, packet::MESSAGES, stream.out()) { + debug!(target: "whisper", "Failed to send messages packet to peer {}: {}", peer_id, e); + io.disconnect_peer(*peer_id); + } + } + } + + // handle status packet from peer. + fn on_status(&self, peer: &PeerId, status: UntrustedRlp) + -> Result<(), Error> + { + let proto: usize = status.as_val()?; + if proto != PROTOCOL_VERSION { return Err(Error::ProtocolVersionMismatch(proto)) } + + let peers = self.peers.read(); + match peers.get(peer) { + Some(peer) => { + let mut peer = peer.lock(); + let our_node_key = self.node_key.read().clone(); + + // handle this basically impossible edge case gracefully. + if peer.node_key == our_node_key { + return Err(Error::SameNodeKey); + } + + // peer with lower node key begins the rally. + if peer.node_key > our_node_key { + peer.state = State::OurTurn; + } else { + peer.state = State::TheirTurn(SystemTime::now()); + } + + Ok(()) + } + None => { + debug!(target: "whisper", "Received message from unknown peer."); + Err(Error::UnknownPeer(*peer)) + } + } + } + + fn on_messages(&self, peer: &PeerId, message_packet: UntrustedRlp) + -> Result<(), Error> + { + let mut messages_vec = { + let peers = self.peers.read(); + let peer = match peers.get(peer) { + Some(peer) => peer, + None => { + debug!(target: "whisper", "Received message from unknown peer."); + return Err(Error::UnknownPeer(*peer)); + } + }; + + let mut peer = peer.lock(); + + if !peer.can_send_messages() { + return Err(Error::UnexpectedMessage); + } + + peer.state = State::OurTurn; + + let now = SystemTime::now(); + let mut messages_vec = message_packet.iter().map(|rlp| Message::decode(rlp, now)) + .collect::, _>>()?; + + if messages_vec.is_empty() { return Ok(()) } + + // disallow duplicates in packet. + messages_vec.retain(|message| peer.note_known(&message)); + messages_vec + }; + + // import for relaying. + let mut messages = self.messages.write(); + + messages_vec.retain(|message| messages.may_accept(&message)); + messages.reserve(messages_vec.len()); + + self.handler.handle_messages(&messages_vec); + + for message in messages_vec { + messages.insert(message); + } + + Ok(()) + } + + fn on_topic_filter(&self, peer: &PeerId, filter: UntrustedRlp) + -> Result<(), Error> + { + let peers = self.peers.read(); + match peers.get(peer) { + Some(peer) => { + let mut peer = peer.lock(); + + if let State::Unconfirmed(_) = peer.state { + return Err(Error::UnexpectedMessage); + } + + peer.set_topic_filter(filter.as_val()?) + } + None => { + debug!(target: "whisper", "Received message from unknown peer."); + return Err(Error::UnknownPeer(*peer)); + } + } + + Ok(()) + } + + fn on_connect(&self, io: &NetworkContext, peer: &PeerId) { + trace!(target: "whisper", "Connecting peer {}", peer); + + let node_key = match io.session_info(*peer).and_then(|info| info.id) { + Some(node_key) => node_key, + None => { + debug!(target: "whisper", "Disconnecting peer {}, who has no node key.", peer); + io.disable_peer(*peer); + return; + } + }; + + self.peers.write().insert(*peer, Mutex::new(Peer { + node_key: node_key, + state: State::Unconfirmed(SystemTime::now()), + known_messages: HashSet::new(), + topic_filter: None, + })); + + if let Err(e) = io.send(*peer, packet::STATUS, ::rlp::encode(&PROTOCOL_VERSION).to_vec()) { + debug!(target: "whisper", "Error sending status: {}", e); + io.disconnect_peer(*peer); + } + } + + fn on_disconnect(&self, peer: &PeerId) { + trace!(target: "whisper", "Disconnecting peer {}", peer); + let _ = self.peers.write().remove(peer); + } +} + +impl ::network::NetworkProtocolHandler for Network { + fn initialize(&self, io: &NetworkContext, host_info: &HostInfo) { + // set up broadcast timer (< 1s) + io.register_timer(RALLY_TOKEN, RALLY_TIMEOUT_MS) + .expect("Failed to initialize message rally timer"); + + *self.node_key.write() = host_info.id().clone(); + } + + fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { + let rlp = UntrustedRlp::new(data); + let res = match packet_id { + packet::STATUS => self.on_status(peer, rlp), + packet::MESSAGES => self.on_messages(peer, rlp), + packet::TOPIC_FILTER => self.on_topic_filter(peer, rlp), + other => Err(Error::UnknownPacket(other)), + }; + + if let Err(e) = res { + trace!(target: "whisper", "Disabling peer due to misbehavior: {}", e); + io.disable_peer(*peer); + } + } + + fn connected(&self, io: &NetworkContext, peer: &PeerId) { + // peer with higher ID should begin rallying. + self.on_connect(io, peer) + } + + fn disconnected(&self, _io: &NetworkContext, peer: &PeerId) { + self.on_disconnect(peer) + } + + fn timeout(&self, io: &NetworkContext, timer: TimerToken) { + // rally with each peer and handle timeouts. + match timer { + RALLY_TOKEN => self.rally(io), + other => debug!(target: "whisper", "Timout triggered on unknown token {}", other), + } + } +} diff --git a/whisper/src/rpc/crypto.rs b/whisper/src/rpc/crypto.rs new file mode 100644 index 000000000..19c26fda8 --- /dev/null +++ b/whisper/src/rpc/crypto.rs @@ -0,0 +1,316 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Encryption schemes supported by RPC layer. + +use bigint::hash::H256; +use ethkey::{self, Public, Secret}; +use ring::aead::{self, AES_256_GCM, SealingKey, OpeningKey}; + +/// Length of AES key +pub const AES_KEY_LEN: usize = 32; +/// Length of AES nonce (IV) +pub const AES_NONCE_LEN: usize = 12; + +// nonce used for encryption when broadcasting +const BROADCAST_IV: [u8; AES_NONCE_LEN] = [0xff; AES_NONCE_LEN]; + +// how to encode aes key/nonce. +enum AesEncode { + AppendedNonce, // receiver known, random nonce appended. + OnTopics(Vec), // receiver knows topics but not key. nonce global. +} + +enum EncryptionInner { + AES([u8; AES_KEY_LEN], [u8; AES_NONCE_LEN], AesEncode), + ECIES(Public), +} + +/// Encryption good for single usage. +pub struct EncryptionInstance(EncryptionInner); + +impl EncryptionInstance { + /// ECIES encryption using public key. Fails if invalid public key. + pub fn ecies(public: Public) -> Result { + if !ethkey::public_is_valid(&public) { + return Err("Invalid public key"); + } + + Ok(EncryptionInstance(EncryptionInner::ECIES(public))) + } + + /// 256-bit AES GCM encryption with given nonce. + /// It is extremely insecure to reuse nonces. + /// + /// If generating nonces with a secure RNG, limit uses such that + /// the chance of collision is negligible. + pub fn aes(key: [u8; AES_KEY_LEN], nonce: [u8; AES_NONCE_LEN]) -> Self { + EncryptionInstance(EncryptionInner::AES(key, nonce, AesEncode::AppendedNonce)) + } + + /// Broadcast encryption for the message based on the given topics. + /// + /// Key reuse here is extremely dangerous. It should be randomly generated + /// with a secure RNG. + pub fn broadcast(key: [u8; AES_KEY_LEN], topics: Vec) -> Self { + EncryptionInstance(EncryptionInner::AES(key, BROADCAST_IV, AesEncode::OnTopics(topics))) + } + + /// Encrypt the supplied plaintext + pub fn encrypt(self, plain: &[u8]) -> Vec { + match self.0 { + EncryptionInner::AES(key, nonce, encode) => { + let sealing_key = SealingKey::new(&AES_256_GCM, &key) + .expect("key is of correct len; qed"); + + let encrypt_plain = move |buf: &mut Vec| { + let out_suffix_capacity = AES_256_GCM.tag_len(); + + let prepend_len = buf.len(); + buf.extend(plain); + + buf.resize(prepend_len + plain.len() + out_suffix_capacity, 0); + + let out_size = aead::seal_in_place( + &sealing_key, + &nonce, + &[], // no authenticated data. + &mut buf[prepend_len..], + out_suffix_capacity, + ).expect("key, nonce, buf are valid and out suffix large enough; qed"); + + // truncate to the output size and return. + buf.truncate(prepend_len + out_size); + }; + + match encode { + AesEncode::AppendedNonce => { + let mut buf = Vec::new(); + encrypt_plain(&mut buf); + buf.extend(&nonce[..]); + buf + } + AesEncode::OnTopics(topics) => { + let mut buf = Vec::new(); + let key = H256(key); + + for topic in topics { + buf.extend(&*(topic ^ key)); + } + + encrypt_plain(&mut buf); + buf + } + } + } + EncryptionInner::ECIES(valid_public) => { + ::ethcrypto::ecies::encrypt(&valid_public, &[], plain) + .expect("validity of public key an invariant of the type; qed") + } + } + } +} + +enum AesExtract { + AppendedNonce([u8; AES_KEY_LEN]), // extract appended nonce. + OnTopics(usize, usize, H256), // number of topics, index we know, topic we know. +} + +enum DecryptionInner { + AES(AesExtract), + ECIES(Secret), +} + +/// Decryption instance good for single usage. +pub struct DecryptionInstance(DecryptionInner); + +impl DecryptionInstance { + /// ECIES decryption using secret key. Fails if invalid secret. + pub fn ecies(secret: Secret) -> Result { + secret.check_validity().map_err(|_| "Invalid secret key")?; + + Ok(DecryptionInstance(DecryptionInner::ECIES(secret))) + } + + /// 256-bit AES GCM decryption with appended nonce. + pub fn aes(key: [u8; AES_KEY_LEN]) -> Self { + DecryptionInstance(DecryptionInner::AES(AesExtract::AppendedNonce(key))) + } + + /// Decode broadcast based on number of topics and known topic. + /// Known topic index may not be larger than num topics - 1. + pub fn broadcast(num_topics: usize, topic_idx: usize, known_topic: H256) -> Result { + if topic_idx >= num_topics { return Err("topic index out of bounds") } + + Ok(DecryptionInstance(DecryptionInner::AES(AesExtract::OnTopics(num_topics, topic_idx, known_topic)))) + } + + /// Decrypt ciphertext. Fails if it's an invalid message. + pub fn decrypt(self, ciphertext: &[u8]) -> Option> { + match self.0 { + DecryptionInner::AES(extract) => { + let decrypt = | + key: [u8; AES_KEY_LEN], + nonce: [u8; AES_NONCE_LEN], + ciphertext: &[u8] + | { + if ciphertext.len() < AES_256_GCM.tag_len() { return None } + + let opening_key = OpeningKey::new(&AES_256_GCM, &key) + .expect("key length is valid for mode; qed"); + + let mut buf = ciphertext.to_vec(); + + // decrypted plaintext always ends up at the + // front of the buffer. + let maybe_decrypted = aead::open_in_place( + &opening_key, + &nonce, + &[], // no authenticated data + 0, // no header. + &mut buf, + ).ok().map(|plain_slice| plain_slice.len()); + + maybe_decrypted.map(move |len| { buf.truncate(len); buf }) + }; + + match extract { + AesExtract::AppendedNonce(key) => { + if ciphertext.len() < AES_NONCE_LEN { return None } + + // nonce is the suffix of ciphertext. + let mut nonce = [0; AES_NONCE_LEN]; + let nonce_offset = ciphertext.len() - AES_NONCE_LEN; + + nonce.copy_from_slice(&ciphertext[nonce_offset..]); + decrypt(key, nonce, &ciphertext[..nonce_offset]) + } + AesExtract::OnTopics(num_topics, known_index, known_topic) => { + if ciphertext.len() < num_topics * 32 { return None } + + let mut salted_topic = H256::new(); + salted_topic.copy_from_slice(&ciphertext[(known_index * 32)..][..32]); + + let key = (salted_topic ^ known_topic).0; + + let offset = num_topics * 32; + decrypt(key, BROADCAST_IV, &ciphertext[offset..]) + } + } + } + DecryptionInner::ECIES(secret) => { + // secret is checked for validity, so only fails on invalid message. + ::ethcrypto::ecies::decrypt(&secret, &[], ciphertext).ok() + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn aes_key_len_should_be_equal_to_constant() { + assert_eq!(::ring::aead::AES_256_GCM.key_len(), AES_KEY_LEN); + } + + #[test] + fn aes_nonce_len_should_be_equal_to_constant() { + assert_eq!(::ring::aead::AES_256_GCM.nonce_len(), AES_NONCE_LEN); + } + + #[test] + fn encrypt_asymmetric() { + use ethkey::{Generator, Random}; + + let key_pair = Random.generate().unwrap(); + let test_message = move |message: &[u8]| { + let instance = EncryptionInstance::ecies(key_pair.public().clone()).unwrap(); + let ciphertext = instance.encrypt(&message); + + if !message.is_empty() { + assert!(&ciphertext[..message.len()] != message) + } + + let instance = DecryptionInstance::ecies(key_pair.secret().clone()).unwrap(); + let decrypted = instance.decrypt(&ciphertext).unwrap(); + + assert_eq!(message, &decrypted[..]) + }; + + test_message(&[1, 2, 3, 4, 5]); + test_message(&[]); + test_message(&[255; 512]); + } + + #[test] + fn encrypt_symmetric() { + use rand::{Rng, OsRng}; + + let mut rng = OsRng::new().unwrap(); + let mut test_message = move |message: &[u8]| { + let key = rng.gen(); + + let instance = EncryptionInstance::aes(key, rng.gen()); + let ciphertext = instance.encrypt(message); + + if !message.is_empty() { + assert!(&ciphertext[..message.len()] != message) + } + + let instance = DecryptionInstance::aes(key); + let decrypted = instance.decrypt(&ciphertext).unwrap(); + + assert_eq!(message, &decrypted[..]) + }; + + test_message(&[1, 2, 3, 4, 5]); + test_message(&[]); + test_message(&[255; 512]); + } + + #[test] + fn encrypt_broadcast() { + use rand::{Rng, OsRng}; + + let mut rng = OsRng::new().unwrap(); + + let mut test_message = move |message: &[u8]| { + let all_topics = (0..5).map(|_| rng.gen()).collect::>(); + let known_idx = 2; + let known_topic = all_topics[2]; + let key = rng.gen(); + + let instance = EncryptionInstance::broadcast(key, all_topics); + let ciphertext = instance.encrypt(message); + + if !message.is_empty() { + assert!(&ciphertext[..message.len()] != message) + } + + let instance = DecryptionInstance::broadcast(5, known_idx, known_topic).unwrap(); + + let decrypted = instance.decrypt(&ciphertext).unwrap(); + + assert_eq!(message, &decrypted[..]) + }; + + test_message(&[1, 2, 3, 4, 5]); + test_message(&[]); + test_message(&[255; 512]); + } +} diff --git a/whisper/src/rpc/filter.rs b/whisper/src/rpc/filter.rs new file mode 100644 index 000000000..2b0c7544d --- /dev/null +++ b/whisper/src/rpc/filter.rs @@ -0,0 +1,416 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Abstraction over filters which works with polling and subscription. + +use std::collections::HashMap; +use std::sync::{mpsc, Arc}; +use std::thread; + +use bigint::hash::{H256, H512}; +use ethkey::Public; +use jsonrpc_macros::pubsub::{Subscriber, Sink}; +use parking_lot::{Mutex, RwLock}; +use rand::{Rng, OsRng}; + +use message::{Message, Topic}; +use super::key_store::KeyStore; +use super::types::{self, FilterItem, HexEncode}; + +/// Kinds of filters, +#[derive(PartialEq, Eq, Clone, Copy)] +pub enum Kind { + /// Polled filter only returns data upon request + Poll, + /// Subscription filter pushes data to subscriber immediately. + Subscription, +} + +pub type ItemBuffer = Arc>>; + +enum FilterEntry { + Poll(Arc, ItemBuffer), + Subscription(Arc, Sink), +} + +/// Filter manager. Handles filters as well as a thread for doing decryption +/// and payload decoding. +pub struct Manager { + key_store: Arc>, + filters: RwLock>, + tx: Mutex>>, + join: Option>, +} + +impl Manager { + /// Create a new filter manager that will dispatch decryption tasks onto + /// the given thread pool. + pub fn new() -> ::std::io::Result { + let (tx, rx) = mpsc::channel::>(); + let join_handle = thread::Builder::new() + .name("Whisper Decryption Worker".to_string()) + .spawn(move || for item in rx { (item)() })?; + + Ok(Manager { + key_store: Arc::new(RwLock::new(KeyStore::new()?)), + filters: RwLock::new(HashMap::new()), + tx: Mutex::new(tx), + join: Some(join_handle), + }) + } + + /// Get a handle to the key store. + pub fn key_store(&self) -> Arc> { + self.key_store.clone() + } + + /// Get filter kind if it's known. + pub fn kind(&self, id: &H256) -> Option { + self.filters.read().get(id).map(|filter| match *filter { + FilterEntry::Poll(_, _) => Kind::Poll, + FilterEntry::Subscription(_, _) => Kind::Subscription, + }) + } + + /// Remove filter by ID. + pub fn remove(&self, id: &H256) { + self.filters.write().remove(id); + } + + /// Add a new polled filter. + pub fn insert_polled(&self, filter: Filter) -> Result { + let buffer = Arc::new(Mutex::new(Vec::new())); + let entry = FilterEntry::Poll(Arc::new(filter), buffer); + let id = OsRng::new() + .map_err(|_| "unable to acquire secure randomness")? + .gen(); + + self.filters.write().insert(id, entry); + Ok(id) + } + + /// Insert new subscription filter. Generates a secure ID and sends it to + /// the + pub fn insert_subscription(&self, filter: Filter, sub: Subscriber) + -> Result<(), &'static str> + { + let id: H256 = OsRng::new() + .map_err(|_| "unable to acquire secure randomness")? + .gen(); + + sub.assign_id(::jsonrpc_pubsub::SubscriptionId::String(id.hex())) + .map(move |sink| { + let entry = FilterEntry::Subscription(Arc::new(filter), sink); + self.filters.write().insert(id, entry); + }) + .map_err(|_| "subscriber disconnected") + } + + /// Poll changes on filter identified by ID. + pub fn poll_changes(&self, id: &H256) -> Option> { + self.filters.read().get(id).and_then(|filter| match *filter { + FilterEntry::Subscription(_, _) => None, + FilterEntry::Poll(_, ref changes) + => Some(::std::mem::replace(&mut *changes.lock(), Vec::new())), + }) + } +} + +// machinery for attaching the manager to the network instance. +impl ::net::MessageHandler for Arc { + fn handle_messages(&self, messages: &[Message]) { + let filters = self.filters.read(); + let filters_iter = filters + .values() + .flat_map(|filter| messages.iter().map(move |msg| (filter, msg))) ; + + for (filter, message) in filters_iter { + // if the message matches any of the possible bloom filters, + // send to thread pool to attempt decryption and avoid + // blocking the network thread for long. + let failed_send = match *filter { + FilterEntry::Poll(ref filter, _) | FilterEntry::Subscription(ref filter, _) + if !filter.basic_matches(message) => None, + FilterEntry::Poll(ref filter, ref buffer) => { + let (message, key_store) = (message.clone(), self.key_store.clone()); + let (filter, buffer) = (filter.clone(), buffer.clone()); + + self.tx.lock().send(Box::new(move || { + filter.handle_message( + &message, + &*key_store, + |matched| buffer.lock().push(matched), + ) + })).err().map(|x| x.0) + } + FilterEntry::Subscription(ref filter, ref sink) => { + let (message, key_store) = (message.clone(), self.key_store.clone()); + let (filter, sink) = (filter.clone(), sink.clone()); + + self.tx.lock().send(Box::new(move || { + filter.handle_message( + &message, + &*key_store, + |matched| { let _ = sink.notify(Ok(matched)); }, + ) + })).err().map(|x| x.0) + } + }; + + // if we failed to send work, no option but to do it locally. + if let Some(local_work) = failed_send { + (local_work)() + } + } + } +} + +impl Drop for Manager { + fn drop(&mut self) { + if let Some(guard) = self.join.take() { + let _ = guard.join(); + } + } +} + +/// Filter incoming messages by critera. +pub struct Filter { + topics: Vec<(Vec, H512, Topic)>, + from: Option, + decrypt_with: Option, +} + +impl Filter { + /// Create a new filter from filter request. + /// + /// Fails if the topics vector is empty. + pub fn new(params: types::FilterRequest) -> Result { + if params.topics.is_empty() { + return Err("no topics for filter"); + } + + let topics: Vec<_> = params.topics.into_iter() + .map(|x| x.into_inner()) + .map(|topic| { + let abridged = super::abridge_topic(&topic); + (topic, abridged.bloom(), abridged) + }) + .collect(); + + Ok(Filter { + topics: topics, + from: params.from.map(|x| x.into_inner()), + decrypt_with: params.decrypt_with.map(|x| x.into_inner()), + }) + } + + // does basic matching: + // whether the given message matches at least one of the topics of the + // filter. + // TODO: minimum PoW heuristic. + fn basic_matches(&self, message: &Message) -> bool { + self.topics.iter().any(|&(_, ref bloom, _)| { + &(bloom & message.bloom()) == bloom + }) + } + + // handle a message that matches the bloom. + fn handle_message( + &self, + message: &Message, + store: &RwLock, + on_match: F, + ) { + use rpc::crypto::DecryptionInstance; + use tiny_keccak::keccak256; + + let matched_indices: Vec<_> = self.topics.iter() + .enumerate() + .filter_map(|(i, &(_, ref bloom, ref abridged))| { + let contains_topic = &(bloom & message.bloom()) == bloom + && message.topics().contains(abridged); + + if contains_topic { Some(i) } else { None } + }) + .collect(); + + if matched_indices.is_empty() { return } + + let decrypt = match self.decrypt_with { + Some(ref id) => match store.read().decryption_instance(id) { + Some(d) => d, + None => { + warn!(target: "whisper", "Filter attempted to decrypt with destroyed identity {}", + id); + + return + } + }, + None => { + let known_idx = matched_indices[0]; + let known_topic = H256(keccak256(&self.topics[0].0)); + + DecryptionInstance::broadcast(message.topics().len(), known_idx, known_topic) + .expect("known idx is within the range 0..message.topics.len(); qed") + } + }; + + let decrypted = match decrypt.decrypt(message.data()) { + Some(d) => d, + None => { + trace!(target: "whisper", "Failed to decrypt message with {} matching topics", + matched_indices.len()); + + return + } + }; + + match ::rpc::payload::decode(&decrypted) { + Ok(decoded) => { + if decoded.from != self.from { return } + + let matched_topics = matched_indices + .into_iter() + .map(|i| self.topics[i].0.clone()) + .map(HexEncode) + .collect(); + + on_match(FilterItem { + from: decoded.from.map(HexEncode), + recipient: self.decrypt_with.map(HexEncode), + ttl: message.envelope().ttl, + topics: matched_topics, + timestamp: message.envelope().expiry - message.envelope().ttl, + payload: HexEncode(decoded.message.to_vec()), + padding: decoded.padding.map(|pad| HexEncode(pad.to_vec())), + }) + } + Err(reason) => + trace!(target: "whisper", "Bad payload in decrypted message with {} topics: {}", + matched_indices.len(), reason), + } + } +} + +#[cfg(test)] +mod tests { + use message::{CreateParams, Message}; + use rpc::types::{FilterRequest, HexEncode}; + use rpc::abridge_topic; + use super::*; + + #[test] + fn rejects_empty_topics() { + let req = FilterRequest { + decrypt_with: Default::default(), + from: None, + topics: Vec::new(), + }; + + assert!(Filter::new(req).is_err()); + } + + #[test] + fn basic_match() { + let topics = vec![vec![1, 2, 3], vec![4, 5, 6]]; + let req = FilterRequest { + decrypt_with: Default::default(), + from: None, + topics: topics.iter().cloned().map(HexEncode).collect(), + }; + + let filter = Filter::new(req).unwrap(); + let message = Message::create(CreateParams { + ttl: 100, + payload: vec![1, 3, 5, 7, 9], + topics: topics.iter().map(|x| abridge_topic(&x)).collect(), + work: 0, + }); + + assert!(filter.basic_matches(&message)); + + let message = Message::create(CreateParams { + ttl: 100, + payload: vec![1, 3, 5, 7, 9], + topics: topics.iter().take(1).map(|x| abridge_topic(&x)).collect(), + work: 0, + }); + + assert!(filter.basic_matches(&message)); + + let message = Message::create(CreateParams { + ttl: 100, + payload: vec![1, 3, 5, 7, 9], + topics: Vec::new(), + work: 0, + }); + + assert!(!filter.basic_matches(&message)); + } + + #[test] + fn decrypt_and_decode() { + use rpc::payload::{self, EncodeParams}; + use rpc::key_store::{Key, KeyStore}; + + let mut store = KeyStore::new().unwrap(); + let signing_pair = Key::new_asymmetric(store.rng()); + let encrypting_key = Key::new_symmetric(store.rng()); + + let decrypt_id = store.insert(encrypting_key); + let encryption_instance = store.encryption_instance(&decrypt_id).unwrap(); + + let store = ::parking_lot::RwLock::new(store); + + let payload = payload::encode(EncodeParams { + message: &[1, 2, 3], + padding: Some(&[4, 5, 4, 5]), + sign_with: Some(signing_pair.secret().unwrap()) + }).unwrap(); + + let encrypted = encryption_instance.encrypt(&payload); + + let message = Message::create(CreateParams { + ttl: 100, + payload: encrypted, + topics: vec![abridge_topic(&[9; 32])], + work: 0, + }); + + let message2 = Message::create(CreateParams { + ttl: 100, + payload: vec![3, 5, 7, 9], + topics: vec![abridge_topic(&[9; 32])], + work: 0, + }); + + let filter = Filter::new(FilterRequest { + decrypt_with: Some(HexEncode(decrypt_id)), + from: Some(HexEncode(signing_pair.public().unwrap().clone())), + topics: vec![HexEncode(vec![9; 32])], + }).unwrap(); + + assert!(filter.basic_matches(&message)); + + let items = ::std::cell::Cell::new(0); + let on_match = |_| { items.set(items.get() + 1); }; + + filter.handle_message(&message, &store, &on_match); + filter.handle_message(&message2, &store, &on_match); + + assert_eq!(items.get(), 1); + } +} diff --git a/whisper/src/rpc/key_store.rs b/whisper/src/rpc/key_store.rs new file mode 100644 index 000000000..ac9e758d6 --- /dev/null +++ b/whisper/src/rpc/key_store.rs @@ -0,0 +1,197 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Identity and keystore for Whisper sessions. +//! +//! Can handle symmetric and asymmetric keys. +//! Symmetric encryption is done via AES-256 in GCM mode. + +use std::collections::HashMap; + +use bigint::hash::H256; +use ethkey::{KeyPair, Public, Secret}; +use rand::{Rng, OsRng}; +use ring::error::Unspecified; + +use rpc::crypto::{AES_KEY_LEN, EncryptionInstance, DecryptionInstance}; + +/// A symmetric or asymmetric key used for encryption, decryption, and signing +/// of payloads. +pub enum Key { + /// ECIES key pair for Secp2561k curve. Suitable for encryption, decryption, + /// and signing. + Asymmetric(KeyPair), + /// AES-256 GCM mode. Suitable for encryption, decryption, but not signing. + Symmetric([u8; AES_KEY_LEN]), +} + +impl Key { + /// Generate a random asymmetric key with the given cryptographic RNG. + pub fn new_asymmetric(rng: &mut OsRng) -> Self { + match ::ethkey::Generator::generate(rng) { + Ok(pair) => Key::Asymmetric(pair), + Err(void) => match void {}, + } + } + + /// Generate a random symmetric key with the given cryptographic RNG. + pub fn new_symmetric(rng: &mut OsRng) -> Self { + Key::Symmetric(rng.gen()) + } + + /// From secret asymmetric key. Fails if secret is invalid. + pub fn from_secret(secret: Secret) -> Result { + KeyPair::from_secret(secret) + .map(Key::Asymmetric) + .map_err(|_| Unspecified) + } + + /// From raw symmetric key. + pub fn from_raw_symmetric(key: [u8; AES_KEY_LEN]) -> Self { + Key::Symmetric(key) + } + + /// Get a handle to the public key if this is an asymmetric key. + pub fn public(&self) -> Option<&Public> { + match *self { + Key::Asymmetric(ref pair) => Some(pair.public()), + Key::Symmetric(_) => None, + } + } + + /// Get a handle to the secret key if this is an asymmetric key. + pub fn secret(&self) -> Option<&Secret> { + match *self { + Key::Asymmetric(ref pair) => Some(pair.secret()), + Key::Symmetric(_) => None, + } + } + + /// Get a handle to the symmetric key. + pub fn symmetric(&self) -> Option<&[u8; AES_KEY_LEN]> { + match *self { + Key::Asymmetric(_) => None, + Key::Symmetric(ref key) => Some(key), + } + } +} + +/// Key store. +pub struct KeyStore { + rng: OsRng, + identities: HashMap, +} + +impl KeyStore { + /// Create the key store. Returns any error in accessing the system's secure + /// RNG. + pub fn new() -> Result { + Ok(KeyStore { + rng: OsRng::new()?, + identities: HashMap::new(), + }) + } + + /// Import a key, generating a random identity for it. + pub fn insert(&mut self, key: Key) -> H256 { + let id = self.rng().gen(); + self.identities.insert(id, key); + + id + } + + /// Get a key by ID. + pub fn get<'a>(&'a self, id: &H256) -> Option<&'a Key> { + self.identities.get(id) + } + + /// Get asymmetric ID's public key. + pub fn public<'a>(&'a self, id: &H256) -> Option<&'a Public> { + self.get(id).and_then(Key::public) + } + + /// Get asymmetric ID's secret key. + pub fn secret<'a>(&'a self, id: &H256) -> Option<&'a Secret> { + self.get(id).and_then(Key::secret) + } + + /// Get symmetric ID's key. + pub fn symmetric<'a>(&'a self, id: &H256) -> Option<&'a [u8; AES_KEY_LEN]> { + self.get(id).and_then(Key::symmetric) + } + + /// Get encryption instance for identity. + pub fn encryption_instance(&self, id: &H256) -> Result { + self.get(id).ok_or("no such identity").and_then(|key| match *key { + Key::Asymmetric(ref pair) => EncryptionInstance::ecies(pair.public().clone()) + .map_err(|_| "could not create encryption instance for id"), + Key::Symmetric(ref key) => + OsRng::new() + .map(|mut rng| EncryptionInstance::aes(key.clone(), rng.gen())) + .map_err(|_| "unable to get secure randomness") + }) + } + + /// Get decryption instance for identity. + /// If the identity is known, always succeeds. + pub fn decryption_instance(&self, id: &H256) -> Option { + self.get(id).map(|key| match *key { + Key::Asymmetric(ref pair) => DecryptionInstance::ecies(pair.secret().clone()) + .expect("all keys stored are valid; qed"), + Key::Symmetric(ref key) => DecryptionInstance::aes(key.clone()), + }) + } + + /// Whether the store contains a key by this ID. + pub fn contains(&self, id: &H256) -> bool { + self.identities.contains_key(id) + } + + /// Remove a key by ID. + pub fn remove(&mut self, id: &H256) -> bool { + self.identities.remove(id).is_some() + } + + /// Get RNG. + pub fn rng(&mut self) -> &mut OsRng { + &mut self.rng + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn rejects_invalid_secret() { + let bad_secret = ::ethkey::Secret::from_slice(&[0xff; 32]); + assert!(Key::from_secret(bad_secret).is_err()); + } + + #[test] + fn generated_key_should_exist() { + let mut store = KeyStore::new().unwrap(); + let key = Key::new_asymmetric(store.rng()); + + assert!(key.public().is_some()); + assert!(key.secret().is_some()); + + let id = store.insert(key); + + assert!(store.contains(&id)); + assert!(store.get(&id).is_some()); + } +} diff --git a/whisper/src/rpc/mod.rs b/whisper/src/rpc/mod.rs new file mode 100644 index 000000000..31fe997d5 --- /dev/null +++ b/whisper/src/rpc/mod.rs @@ -0,0 +1,402 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! JSONRPC interface for Whisper. +//! +//! Manages standard message format decoding, ephemeral identities, signing, +//! encryption, and decryption. +//! +//! Provides an interface for using whisper to transmit data securely. + +use std::sync::Arc; + +use jsonrpc_core::{Error, ErrorCode, Metadata}; +use jsonrpc_pubsub::{Session, PubSubMetadata, SubscriptionId}; +use jsonrpc_macros::pubsub; + +use bigint::hash::H256; +use futures::{future, BoxFuture}; +use parking_lot::RwLock; + +use self::filter::Filter; +use self::key_store::{Key, KeyStore}; +use self::types::HexEncode; + +use message::{CreateParams, Message, Topic}; + +mod crypto; +mod filter; +mod key_store; +mod payload; +mod types; + +pub use self::filter::Manager as FilterManager; + +// create whisper RPC error. +fn whisper_error>(message: T) -> Error { + const ERROR_CODE: i64 = -32085; + + Error { + code: ErrorCode::ServerError(ERROR_CODE), + message: message.into(), + data: None, + } +} + +fn topic_hash(topic: &[u8]) -> H256 { + H256(::tiny_keccak::keccak256(topic)) +} + +// abridge topic using first four bytes of hash. +fn abridge_topic(topic: &[u8]) -> Topic { + let mut abridged = [0; 4]; + let hash = topic_hash(topic).0; + abridged.copy_from_slice(&hash[..4]); + abridged.into() +} + +build_rpc_trait! { + /// Whisper RPC interface. + pub trait Whisper { + /// Info about the node. + #[rpc(name = "shh_info")] + fn info(&self) -> Result; + + /// Generate a new asymmetric key pair and return an identity. + #[rpc(name = "shh_newKeyPair")] + fn new_key_pair(&self) -> Result; + + /// Import the given SECP2561k private key and return an identity. + #[rpc(name = "shh_addPrivateKey")] + fn add_private_key(&self, types::Private) -> Result; + + /// Generate a new symmetric key and return an identity. + #[rpc(name = "shh_newSymKey")] + fn new_sym_key(&self) -> Result; + + /// Import the given symmetric key and return an identity. + #[rpc(name = "shh_addSymKey")] + fn add_sym_key(&self, types::Symmetric) -> Result; + + /// Get public key. Succeeds if identity is stored and asymmetric. + #[rpc(name = "shh_getPublicKey")] + fn get_public(&self, types::Identity) -> Result; + + /// Get private key. Succeeds if identity is stored and asymmetric. + #[rpc(name = "shh_getPrivateKey")] + fn get_private(&self, types::Identity) -> Result; + + #[rpc(name = "shh_getSymKey")] + fn get_symmetric(&self, types::Identity) -> Result; + + /// Delete key pair denoted by given identity. + /// + /// Return true if successfully removed, false if unknown, + /// and error otherwise. + #[rpc(name = "shh_deleteKey")] + fn remove_key(&self, types::Identity) -> Result; + + /// Post a message to the network with given parameters. + #[rpc(name = "shh_post")] + fn post(&self, types::PostRequest) -> Result; + + /// Create a new polled filter. + #[rpc(name = "shh_newMessageFilter")] + fn new_filter(&self, types::FilterRequest) -> Result; + + /// Poll changes on a polled filter. + #[rpc(name = "shh_getFilterMessages")] + fn poll_changes(&self, types::Identity) -> Result, Error>; + + /// Delete polled filter. Return bool indicating success. + #[rpc(name = "shh_deleteMessageFilter")] + fn delete_filter(&self, types::Identity) -> Result; + } +} + +build_rpc_trait! { + /// Whisper RPC pubsub. + pub trait WhisperPubSub { + type Metadata; + + #[pubsub(name = "hello")] { + /// Subscribe to messages matching the filter. + #[rpc(name = "ssh_subscribe")] + fn subscribe(&self, Self::Metadata, pubsub::Subscriber, types::FilterRequest); + + /// Unsubscribe from filter matching given ID. Return + /// true on success, error otherwise. + #[rpc(name = "shh_unsubscribe")] + fn unsubscribe(&self, SubscriptionId) -> BoxFuture; + } + } +} + +/// Something which can send messages to the network. +pub trait PoolHandle: Send + Sync { + /// Give message to the whisper network for relay. + /// Returns false if PoW too low. + fn relay(&self, message: Message) -> bool; + + /// Number of messages and memory used by resident messages. + fn pool_status(&self) -> ::net::PoolStatus; +} + +impl PoolHandle for ::net::PoolHandle { + fn relay(&self, message: Message) -> bool { + self.post_message(message) + } + + fn pool_status(&self) -> ::net::PoolStatus { + ::net::PoolHandle::pool_status(self) + } +} + +/// Default, simple metadata implementation. +#[derive(Clone, Default)] +pub struct Meta { + session: Option>, +} + +impl Metadata for Meta {} +impl PubSubMetadata for Meta { + fn session(&self) -> Option> { + self.session.clone() + } +} + +/// Implementation of whisper RPC. +pub struct WhisperClient { + store: Arc>, + pool: P, + filter_manager: Arc, + _meta: ::std::marker::PhantomData, +} + +impl

WhisperClient

{ + /// Create a new whisper client with basic metadata. + pub fn with_simple_meta(pool: P, filter_manager: Arc) -> Self { + WhisperClient::new(pool, filter_manager) + } +} + +impl WhisperClient { + /// Create a new whisper client. + pub fn new(pool: P, filter_manager: Arc) -> Self { + WhisperClient { + store: filter_manager.key_store(), + pool: pool, + filter_manager: filter_manager, + _meta: ::std::marker::PhantomData, + } + } + + fn delete_filter_kind(&self, id: H256, kind: filter::Kind) -> bool { + match self.filter_manager.kind(&id) { + Some(k) if k == kind => { + self.filter_manager.remove(&id); + true + } + None | Some(_) => false, + } + } +} + +impl Whisper for WhisperClient { + fn info(&self) -> Result { + let status = self.pool.pool_status(); + + Ok(types::NodeInfo { + required_pow: status.required_pow, + messages: status.message_count, + memory: status.cumulative_size, + target_memory: status.target_size, + }) + } + + fn new_key_pair(&self) -> Result { + let mut store = self.store.write(); + let key_pair = Key::new_asymmetric(store.rng()); + + Ok(HexEncode(store.insert(key_pair))) + } + + fn add_private_key(&self, private: types::Private) -> Result { + let key_pair = Key::from_secret(private.into_inner().into()) + .map_err(|_| whisper_error("Invalid private key"))?; + + Ok(HexEncode(self.store.write().insert(key_pair))) + } + + fn new_sym_key(&self) -> Result { + let mut store = self.store.write(); + let key = Key::new_symmetric(store.rng()); + + Ok(HexEncode(store.insert(key))) + } + + fn add_sym_key(&self, raw_key: types::Symmetric) -> Result { + let raw_key = raw_key.into_inner().0; + let key = Key::from_raw_symmetric(raw_key); + + Ok(HexEncode(self.store.write().insert(key))) + } + + fn get_public(&self, id: types::Identity) -> Result { + self.store.read().public(&id.into_inner()) + .cloned() + .map(HexEncode) + .ok_or_else(|| whisper_error("Unknown identity")) + } + + fn get_private(&self, id: types::Identity) -> Result { + self.store.read().secret(&id.into_inner()) + .map(|x| (&**x).clone()) + .map(HexEncode) + .ok_or_else(|| whisper_error("Unknown identity")) + } + + fn get_symmetric(&self, id: types::Identity) -> Result { + self.store.read().symmetric(&id.into_inner()) + .cloned() + .map(H256) + .map(HexEncode) + .ok_or_else(|| whisper_error("Unknown identity")) + } + + fn remove_key(&self, id: types::Identity) -> Result { + Ok(self.store.write().remove(&id.into_inner())) + } + + fn post(&self, req: types::PostRequest) -> Result { + use self::crypto::EncryptionInstance; + + let encryption = match req.to { + Some(types::Receiver::Public(public)) => EncryptionInstance::ecies(public.into_inner()) + .map_err(whisper_error)?, + Some(types::Receiver::Identity(id)) => self.store.read().encryption_instance(&id.into_inner()) + .map_err(whisper_error)?, + None => { + use rand::{Rng, OsRng}; + + // broadcast mode: use fixed nonce and fresh key each time. + + let mut rng = OsRng::new() + .map_err(|_| whisper_error("unable to acquire secure randomness"))?; + + let key = rng.gen(); + if req.topics.is_empty() { + return Err(whisper_error("must supply at least one topic for broadcast message")); + } + + EncryptionInstance::broadcast( + key, + req.topics.iter().map(|x| topic_hash(&x)).collect() + ) + } + }; + + let sign_with = match req.from { + Some(from) => { + Some( + self.store.read().secret(&from.into_inner()) + .cloned() + .ok_or_else(|| whisper_error("Unknown identity `from`"))? + ) + } + None => None, + }; + + let encrypted = { + let payload = payload::encode(payload::EncodeParams { + message: &req.payload.into_inner(), + padding: req.padding.map(|p| p.into_inner()).as_ref().map(|x| &x[..]), + sign_with: sign_with.as_ref(), + }).map_err(whisper_error)?; + + encryption.encrypt(&payload) + }; + + // mining the packet is the heaviest item of work by far. + // there may be a benefit to dispatching this onto the CPU pool + // and returning a future. but then things get _less_ efficient + // if the server infrastructure has more threads than the CPU pool. + let message = Message::create(CreateParams { + ttl: req.ttl, + payload: encrypted, + topics: req.topics.into_iter().map(|x| abridge_topic(&x.into_inner())).collect(), + work: req.priority, + }); + + if !self.pool.relay(message) { + Err(whisper_error("PoW too low to compete with other messages")) + } else { + Ok(true) + } + } + + fn new_filter(&self, req: types::FilterRequest) -> Result { + let filter = Filter::new(req).map_err(whisper_error)?; + + self.filter_manager.insert_polled(filter) + .map(HexEncode) + .map_err(whisper_error) + } + + fn poll_changes(&self, id: types::Identity) -> Result, Error> { + match self.filter_manager.poll_changes(&id.into_inner()) { + None => Err(whisper_error("no such message filter")), + Some(items) => Ok(items), + } + } + + fn delete_filter(&self, id: types::Identity) -> Result { + Ok(self.delete_filter_kind(id.into_inner(), filter::Kind::Poll)) + } +} + +impl WhisperPubSub for WhisperClient { + type Metadata = M; + + fn subscribe( + &self, + _meta: Self::Metadata, + subscriber: pubsub::Subscriber, + req: types::FilterRequest, + ) { + match Filter::new(req) { + Ok(filter) => { + if let Err(e) = self.filter_manager.insert_subscription(filter, subscriber) { + debug!(target: "whisper", "Failed to add subscription: {}", e); + } + } + Err(reason) => { let _ = subscriber.reject(whisper_error(reason)); } + } + } + + fn unsubscribe(&self, id: SubscriptionId) -> BoxFuture { + use std::str::FromStr; + + let res = match id { + SubscriptionId::String(s) => H256::from_str(&s) + .map_err(|_| "unrecognized ID") + .map(|id| self.delete_filter_kind(id, filter::Kind::Subscription)), + SubscriptionId::Number(_) => Err("unrecognized ID"), + }; + + Box::new(future::done(res.map_err(whisper_error))) + } +} diff --git a/whisper/src/rpc/payload.rs b/whisper/src/rpc/payload.rs new file mode 100644 index 000000000..b68f55a29 --- /dev/null +++ b/whisper/src/rpc/payload.rs @@ -0,0 +1,357 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Common payload format definition, construction, and decoding. +//! +//! Format: +//! flags: 1 byte +//! +//! payload size: 0..4 bytes, BE, determined by flags. +//! optional padding: byte array up to 2^24 bytes in length. encoded in payload size. +//! optional signature: 65 bytes (r, s, v) +//! +//! payload: byte array of length of arbitrary size. +//! +//! flag bits used: +//! 0, 1 => how many bytes indicate padding length (up to 3) +//! 2 => whether signature is present +//! +//! padding is used to mask information about size of message. +//! +//! AES-256-GCM will append 12 bytes of metadata to the front of the message. + +use bigint::hash::H256; +use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; +use ethkey::{Public, Secret}; +use tiny_keccak::keccak256; + +const SIGNATURE_LEN: usize = 65; + +const STANDARD_PAYLOAD_VERSION: u8 = 1; + +bitflags! { + struct Flags: u8 { + const FLAG_PAD_LEN_HIGH = 0b10000000; + const FLAG_PAD_LEN_LOW = 0b01000000; + const FLAG_SIGNED = 0b00100000; + } +} + +// number of bytes of padding length (in the range 0..4) +fn padding_length_bytes(flags: Flags) -> usize { + match (flags & FLAG_PAD_LEN_HIGH, flags & FLAG_PAD_LEN_LOW) { + (FLAG_PAD_LEN_HIGH, FLAG_PAD_LEN_LOW) => 3, + (FLAG_PAD_LEN_HIGH, _) => 2, + (_, FLAG_PAD_LEN_LOW) => 1, + (_, _) => 0, + } +} + +// how many bytes are necessary to encode the given length. Range 0..4. +// `None` if too large. +fn num_padding_length_bytes(padding_len: usize) -> Option { + let bits = 64 - (padding_len as u64).leading_zeros(); + match bits { + 0 => Some(0), + 0 ... 8 => Some(1), + 0 ... 16 => Some(2), + 0 ... 24 => Some(3), + _ => None, + } +} + +/// Parameters for encoding a standard payload. +pub struct EncodeParams<'a> { + /// Message to encode. + pub message: &'a [u8], + /// Padding bytes. Maximum padding allowed is 65536 bytes. + pub padding: Option<&'a [u8]>, + /// Private key to sign with. + pub sign_with: Option<&'a Secret>, +} + +impl<'a> Default for EncodeParams<'a> { + fn default() -> Self { + EncodeParams { + message: &[], + padding: None, + sign_with: None, + } + } +} + +/// Parameters for decoding a standard payload. +pub struct Decoded<'a> { + /// Decoded message. + pub message: &'a [u8], + /// optional padding. + pub padding: Option<&'a [u8]>, + /// Recovered signature. + pub from: Option, +} + +/// Encode using provided parameters. +pub fn encode(params: EncodeParams) -> Result, &'static str> { + const VEC_WRITE_INFALLIBLE: &'static str = "writing to a Vec can never fail; qed"; + + let padding_len = params.padding.map_or(0, |x| x.len()); + let padding_len_bytes = num_padding_length_bytes(padding_len) + .ok_or_else(|| "padding size too long")?; + + let signature = params.sign_with.map(|secret| { + let hash = H256(keccak256(params.message)); + ::ethkey::sign(secret, &hash) + }); + + let signature = match signature { + Some(Ok(sig)) => Some(sig), + Some(Err(_)) => return Err("invalid signing key provided"), + None => None, + }; + + let (flags, plaintext_size) = { + let mut flags = Flags::empty(); + + // 1 byte each for flags and version. + let mut plaintext_size = 2 + + padding_len_bytes + + padding_len + + params.message.len(); + + flags.bits = (padding_len_bytes << 6) as u8; + debug_assert_eq!(padding_length_bytes(flags), padding_len_bytes); + + if let Some(ref sig) = signature { + plaintext_size += sig.len(); + flags |= FLAG_SIGNED; + } + + (flags, plaintext_size) + }; + + let mut plaintext = Vec::with_capacity(plaintext_size); + + plaintext.push(STANDARD_PAYLOAD_VERSION); + plaintext.push(flags.bits); + + if let Some(padding) = params.padding { + plaintext.write_uint::(padding_len as u64, padding_len_bytes) + .expect(VEC_WRITE_INFALLIBLE); + + plaintext.extend(padding) + } + + if let Some(signature) = signature { + plaintext.extend(signature.r()); + plaintext.extend(signature.s()); + plaintext.push(signature.v()); + } + + plaintext.extend(params.message); + + Ok(plaintext) +} + +/// Decode using provided parameters +pub fn decode(payload: &[u8]) -> Result { + let mut offset = 0; + + let (padding, signature) = { + // use a closure for reading slices since std::io::Read would require + // us to copy. + let mut next_slice = |len| { + let end = offset + len; + if payload.len() >= end { + let slice = &payload[offset .. end]; + offset = end; + + Ok(slice) + } else { + return Err("unexpected end of payload") + } + }; + + + if next_slice(1)?[0] != STANDARD_PAYLOAD_VERSION { + return Err("unknown payload version."); + } + + let flags = Flags::from_bits_truncate(next_slice(1)?[0]); + + let padding_len_bytes = padding_length_bytes(flags); + let padding = if padding_len_bytes != 0 { + let padding_len = BigEndian::read_uint( + next_slice(padding_len_bytes)?, + padding_len_bytes, + ); + + Some(next_slice(padding_len as usize)?) + } else { + None + }; + + let signature = if flags & FLAG_SIGNED == FLAG_SIGNED { + let slice = next_slice(SIGNATURE_LEN)?; + let mut arr = [0; SIGNATURE_LEN]; + + arr.copy_from_slice(slice); + let signature = ::ethkey::Signature::from(arr); + + let not_rsv = signature.r() != &slice[..32] + || signature.s() != &slice[32..64] + || signature.v() != slice[64]; + + if not_rsv { + return Err("signature not in RSV format"); + } else { + Some(signature) + } + } else { + None + }; + + (padding, signature) + }; + + // remaining data is the message. + let message = &payload[offset..]; + + let from = match signature { + None => None, + Some(sig) => { + let hash = H256(keccak256(message)); + Some(::ethkey::recover(&sig, &hash).map_err(|_| "invalid signature")?) + } + }; + + Ok(Decoded { + message: message, + padding: padding, + from: from, + }) +} + +#[cfg(test)] +mod tests { + use ethkey::{Generator, Random}; + use super::*; + + #[test] + fn padding_len_bytes_sanity() { + const U24_MAX: usize = (1 << 24) - 1; + + assert_eq!(padding_length_bytes(FLAG_PAD_LEN_HIGH | FLAG_PAD_LEN_LOW), 3); + assert_eq!(padding_length_bytes(FLAG_PAD_LEN_HIGH), 2); + assert_eq!(padding_length_bytes(FLAG_PAD_LEN_LOW), 1); + assert_eq!(padding_length_bytes(Flags::empty()), 0); + + assert!(num_padding_length_bytes(u32::max_value() as _).is_none()); + assert!(num_padding_length_bytes(U24_MAX + 1).is_none()); + + assert_eq!(num_padding_length_bytes(U24_MAX), Some(3)); + + assert_eq!(num_padding_length_bytes(u16::max_value() as usize + 1), Some(3)); + assert_eq!(num_padding_length_bytes(u16::max_value() as usize), Some(2)); + + assert_eq!(num_padding_length_bytes(u8::max_value() as usize + 1), Some(2)); + assert_eq!(num_padding_length_bytes(u8::max_value() as usize), Some(1)); + + assert_eq!(num_padding_length_bytes(1), Some(1)); + assert_eq!(num_padding_length_bytes(0), Some(0)); + } + + #[test] + fn encode_decode_roundtrip() { + let message = [1, 2, 3, 4, 5]; + let encoded = encode(EncodeParams { + message: &message, + padding: None, + sign_with: None, + }).unwrap(); + + let decoded = decode(&encoded).unwrap(); + + assert_eq!(message, decoded.message); + } + + #[test] + fn encode_empty() { + let encoded = encode(EncodeParams { + message: &[], + padding: None, + sign_with: None, + }).unwrap(); + + let decoded = decode(&encoded).unwrap(); + + assert!(decoded.message.is_empty()); + } + + #[test] + fn encode_with_signature() { + let key_pair = Random.generate().unwrap(); + let message = [1, 3, 5, 7, 9]; + + let encoded = encode(EncodeParams { + message: &message, + padding: None, + sign_with: Some(key_pair.secret()), + }).unwrap(); + + let decoded = decode(&encoded).unwrap(); + + assert_eq!(decoded.message, message); + assert_eq!(decoded.from, Some(key_pair.public().clone())); + assert!(decoded.padding.is_none()); + } + + #[test] + fn encode_with_padding() { + let message = [1, 3, 5, 7, 9]; + let padding = [0xff; 1024 - 5]; + + let encoded = encode(EncodeParams { + message: &message, + padding: Some(&padding), + sign_with: None, + }).unwrap(); + + let decoded = decode(&encoded).unwrap(); + + assert_eq!(decoded.message, message); + assert_eq!(decoded.padding, Some(&padding[..])); + assert!(decoded.from.is_none()); + } + + #[test] + fn encode_with_padding_and_signature() { + let key_pair = Random.generate().unwrap(); + let message = [1, 3, 5, 7, 9]; + let padding = [0xff; 1024 - 5]; + + let encoded = encode(EncodeParams { + message: &message, + padding: Some(&padding), + sign_with: Some(key_pair.secret()), + }).unwrap(); + + let decoded = decode(&encoded).unwrap(); + + assert_eq!(decoded.message, message); + assert_eq!(decoded.padding, Some(&padding[..])); + assert_eq!(decoded.from, Some(key_pair.public().clone())); + } +} diff --git a/whisper/src/rpc/types.rs b/whisper/src/rpc/types.rs new file mode 100644 index 000000000..ac06d69d5 --- /dev/null +++ b/whisper/src/rpc/types.rs @@ -0,0 +1,298 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Types for Whisper RPC. + +use std::fmt; +use std::ops::Deref; + +use bigint::hash::*; +use hex::{ToHex, FromHex}; + +use serde::{Serialize, Serializer, Deserialize, Deserializer}; +use serde::de::{Error, Visitor}; + +/// Helper trait for generic hex bytes encoding. +pub trait HexEncodable: Sized + ::std::ops::Deref { + fn from_bytes(bytes: Vec) -> Option; +} + +impl HexEncodable for Vec { + fn from_bytes(bytes: Vec) -> Option { Some(bytes) } +} + +macro_rules! impl_hex_for_hash { + ($($t: ident)*) => { + $( + impl HexEncodable for $t { + fn from_bytes(bytes: Vec) -> Option { + if bytes.len() != $t::len() { + None + } else { + Some($t::from_slice(&bytes)) + } + } + } + )* + } +} + +impl_hex_for_hash!( + H32 H64 H128 H256 H264 H512 H1024 +); + +/// Wrapper structure around hex-encoded data. +#[derive(Debug, PartialEq, Eq, Default, Hash, Clone)] +pub struct HexEncode(pub T); + +impl From for HexEncode { + fn from(x: T) -> Self { + HexEncode(x) + } +} + +impl HexEncode { + /// Create a new wrapper from the inner value. + pub fn new(x: T) -> Self { HexEncode(x) } + + /// Consume the wrapper, yielding the inner value. + pub fn into_inner(self) -> T { self.0 } +} + +impl Deref for HexEncode { + type Target = T; + + fn deref(&self) -> &T { &self.0 } +} + +/// Hex-encoded arbitrary-byte vector. +pub type Bytes = HexEncode>; + +/// 32-byte local identity +pub type Identity = HexEncode; + +/// Public key for ECIES, SECP256k1 +pub type Public = HexEncode<::ethkey::Public>; + +/// Unvalidated private key for ECIES, SECP256k1 +pub type Private = HexEncode; + +/// Abridged topic is four bytes. +// only used in tests for now. +#[cfg(test)] +pub type AbridgedTopic = HexEncode; + +/// 32-byte AES key. +pub type Symmetric = HexEncode; + +impl Serialize for HexEncode { + fn serialize(&self, serializer: S) -> Result { + let data = &self.0[..]; + let serialized = "0x".to_owned() + &data.to_hex(); + + serializer.serialize_str(serialized.as_ref()) + } +} + +impl<'a, T: 'a + HexEncodable> Deserialize<'a> for HexEncode { + fn deserialize(deserializer: D) -> Result + where D: Deserializer<'a> + { + deserializer.deserialize_any(HexEncodeVisitor::(::std::marker::PhantomData)) + } +} + +// helper type for decoding anything from hex. +struct HexEncodeVisitor(::std::marker::PhantomData); + +impl<'a, T: HexEncodable> Visitor<'a> for HexEncodeVisitor { + type Value = HexEncode; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + write!(formatter, "a 0x-prefixed, hex-encoded vector of bytes") + } + + fn visit_str(self, value: &str) -> Result { + let decoded = if value.len() >= 2 && &value[0..2] == "0x" && value.len() & 1 == 0 { + Ok(Vec::from_hex(&value[2..]).map_err(|_| Error::custom("invalid hex"))?) + } else { + Err(Error::custom("invalid format")) + }; + + decoded + .and_then(|x| T::from_bytes(x).ok_or(Error::custom("invalid format"))) + .map(HexEncode) + } + + fn visit_string(self, value: String) -> Result where E: Error { + self.visit_str(value.as_ref()) + } +} + +/// Receiver of a message. Either a public key, identity (presumably symmetric), +/// or broadcast over the topics. +#[derive(Deserialize)] +pub enum Receiver { + #[serde(rename="public")] + Public(Public), + #[serde(rename="identity")] + Identity(Identity), +} + +/// A request to post a message to the whisper network. +#[derive(Deserialize)] +pub struct PostRequest { + /// Receiver of the message. Either a public key or + /// an identity. If the identity is symmetric, it will + /// encrypt to that identity. + /// + /// If the receiver is missing, this will be a broadcast message. + pub to: Option, + + /// Sender of the message. + /// + /// If present, the payload will be signed by this + /// identity. The call will fail if the whisper node doesn't store the + /// signing key for this identity. + #[serde(skip_serializing_if = "Option::is_none")] + pub from: Option, + + /// Full topics to identify a message by. + /// At least one topic must be specified if the receiver is + /// not specified. + pub topics: Vec, + + /// Payload of the message + pub payload: Bytes, + + /// Optional padding of the message. No larger than 2^24 - 1. + pub padding: Option, + + /// Priority of the message: how many milliseconds to spend doing PoW + pub priority: u64, + + /// Time-To-Live of the message in seconds. + pub ttl: u64, +} + +/// Request for filter or subscription creation. +#[derive(Deserialize)] +pub struct FilterRequest { + /// ID of key used for decryption. + /// + /// If this identity is removed, then no further messages will be returned. + /// + /// If optional, this will listen for broadcast messages. + #[serde(rename = "decryptWith")] + pub decrypt_with: Option, + + /// Accept only messages signed by given public key. + pub from: Option, + + /// Possible topics. Cannot be empty if the identity is `None` + pub topics: Vec, +} + +/// A message captured by a filter or subscription. +#[derive(Serialize, Clone)] +pub struct FilterItem { + /// Public key that signed this message. + #[serde(skip_serializing_if = "Option::is_none")] + pub from: Option, + + /// Identity of recipient. If the filter wasn't registered with a + /// recipient, this will be `None`. + #[serde(skip_serializing_if = "Option::is_none")] + pub recipient: Option, + + /// Time to live in seconds. + pub ttl: u64, + + /// Abridged topics that matched the filter. + pub topics: Vec, + + /// Unix timestamp of the message generation. + pub timestamp: u64, + + /// Decrypted/Interpreted payload. + pub payload: Bytes, + + /// Optional padding data. + #[serde(skip_serializing_if = "Option::is_none")] + pub padding: Option, +} + +/// Whisper node info. +#[derive(Serialize)] +pub struct NodeInfo { + /// min PoW to be accepted into the local pool. + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(rename = "minPow")] + pub required_pow: Option, + + /// Number of messages in the pool. + pub messages: usize, + + /// Memory used by messages in the pool. + pub memory: usize, + + /// Target memory of the pool. + #[serde(rename = "targetMemory")] + pub target_memory: usize, +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json; + use hex::FromHex; + + #[test] + fn test_bytes_serialize() { + let bytes = Bytes::new(Vec::from_hex("0123456789abcdef").unwrap()); + let serialized = serde_json::to_string(&bytes).unwrap(); + assert_eq!(serialized, r#""0x0123456789abcdef""#); + } + + #[test] + fn test_bytes_deserialize() { + let bytes2: Result = serde_json::from_str(r#""0x123""#); + let bytes3: Result = serde_json::from_str(r#""0xgg""#); + + let bytes4: Bytes = serde_json::from_str(r#""0x""#).unwrap(); + let bytes5: Bytes = serde_json::from_str(r#""0x12""#).unwrap(); + let bytes6: Bytes = serde_json::from_str(r#""0x0123""#).unwrap(); + + assert!(bytes2.is_err()); + assert!(bytes3.is_err()); + assert_eq!(bytes4, Bytes::new(vec![])); + assert_eq!(bytes5, Bytes::new(vec![0x12])); + assert_eq!(bytes6, Bytes::new(vec![0x1, 0x23])); + } + + #[test] + fn deserialize_topic() { + let topic = AbridgedTopic::new([1, 2, 3, 15].into()); + + let topic1: Result = serde_json::from_str(r#""0x010203""#); + let topic2: Result = serde_json::from_str(r#""0102030F""#); + let topic3: AbridgedTopic = serde_json::from_str(r#""0x0102030F""#).unwrap(); + + assert!(topic1.is_err()); + assert!(topic2.is_err()); + assert_eq!(topic3, topic); + } +}