Initial Whisper implementation (#6009)

* whisper skeleton

* basic message store

* rallying and message logic

* pass host info to network protocol handlers

* choose who starts rally based on node key

* module reshuffling

* mining messages

* prune messages by low PoW until below size target

* associated error type for ethkey generators and `OsRng` generator

* beginnings of RPC

* generic message handler for whisper

* reshuffle code order

* standard payload encoding and decoding

* basic crypto

* minor restructuring of net code

* implement shh_post

* merge?

* implement filters

* rand trait for hash types

* filter RPCs for whisper

* symmetric encryption of payload

* pub-sub

* filter tests

* use only secure random IDs

* attach arbitrary protocols to network

* basic integration of whisper into Parity

* eagerly prune low PoW entries

* broadcast messages with salted topics

* node info RPC

* fix import

* fix leading zeros calculation

* address minor grumbles
This commit is contained in:
Robert Habermeier
2017-07-14 20:40:28 +02:00
committed by Gav Wood
parent a4fa6a3ac7
commit 99075ad22a
37 changed files with 3642 additions and 55 deletions

View File

@@ -144,3 +144,7 @@ jit = false
logging = "own_tx=trace"
log_file = "/var/log/parity.log"
color = true
[whisper]
enabled = false
pool_size = 20

View File

@@ -84,3 +84,7 @@ log_file = "/var/log/parity.log"
color = true
ports_shift = 0
unsafe_expose = false
[whisper]
enabled = true
pool_size = 50

View File

@@ -176,7 +176,7 @@ usage! {
or |c: &Config| otry!(c.rpc).interface.clone(),
flag_jsonrpc_cors: Option<String> = None,
or |c: &Config| otry!(c.rpc).cors.clone().map(Some),
flag_jsonrpc_apis: String = "web3,eth,pubsub,net,parity,parity_pubsub,traces,rpc,secretstore",
flag_jsonrpc_apis: String = "web3,eth,pubsub,net,parity,parity_pubsub,traces,rpc,secretstore,shh,shh_pubsub",
or |c: &Config| otry!(c.rpc).apis.as_ref().map(|vec| vec.join(",")),
flag_jsonrpc_hosts: String = "none",
or |c: &Config| otry!(c.rpc).hosts.as_ref().map(|vec| vec.join(",")),
@@ -192,7 +192,7 @@ usage! {
or |c: &Config| otry!(c.websockets).port.clone(),
flag_ws_interface: String = "local",
or |c: &Config| otry!(c.websockets).interface.clone(),
flag_ws_apis: String = "web3,eth,pubsub,net,parity,parity_pubsub,traces,rpc,secretstore",
flag_ws_apis: String = "web3,eth,pubsub,net,parity,parity_pubsub,traces,rpc,secretstore,shh,shh_pubsub",
or |c: &Config| otry!(c.websockets).apis.as_ref().map(|vec| vec.join(",")),
flag_ws_origins: String = "chrome-extension://*",
or |c: &Config| otry!(c.websockets).origins.as_ref().map(|vec| vec.join(",")),
@@ -204,7 +204,7 @@ usage! {
or |c: &Config| otry!(c.ipc).disable.clone(),
flag_ipc_path: String = if cfg!(windows) { r"\\.\pipe\jsonrpc.ipc" } else { "$BASE/jsonrpc.ipc" },
or |c: &Config| otry!(c.ipc).path.clone(),
flag_ipc_apis: String = "web3,eth,pubsub,net,parity,parity_pubsub,parity_accounts,traces,rpc,secretstore",
flag_ipc_apis: String = "web3,eth,pubsub,net,parity,parity_pubsub,parity_accounts,traces,rpc,secretstore,shh,shh_pubsub",
or |c: &Config| otry!(c.ipc).apis.as_ref().map(|vec| vec.join(",")),
// DAPPS
@@ -365,6 +365,12 @@ usage! {
flag_no_color: bool = false,
or |c: &Config| otry!(c.misc).color.map(|c| !c).clone(),
// -- Whisper options
flag_whisper: bool = false,
or |c: &Config| otry!(c.whisper).enabled,
flag_whisper_pool_size: usize = 10usize,
or |c: &Config| otry!(c.whisper).pool_size.clone(),
// -- Legacy Options supported in configs
flag_dapps_port: Option<u16> = None,
or |c: &Config| otry!(c.dapps).port.clone().map(Some),
@@ -407,6 +413,7 @@ struct Config {
vm: Option<VM>,
misc: Option<Misc>,
stratum: Option<Stratum>,
whisper: Option<Whisper>,
}
#[derive(Default, Debug, PartialEq, Deserialize)]
@@ -603,12 +610,18 @@ struct Misc {
unsafe_expose: Option<bool>,
}
#[derive(Default, Debug, PartialEq, Deserialize)]
struct Whisper {
enabled: Option<bool>,
pool_size: Option<usize>,
}
#[cfg(test)]
mod tests {
use super::{
Args, ArgsError,
Config, Operating, Account, Ui, Network, Ws, Rpc, Ipc, Dapps, Ipfs, Mining, Footprint,
Snapshots, VM, Misc, SecretStore,
Snapshots, VM, Misc, Whisper, SecretStore,
};
use toml;
@@ -860,6 +873,10 @@ mod tests {
// -- Virtual Machine Options
flag_jitvm: false,
// -- Whisper options.
flag_whisper: false,
flag_whisper_pool_size: 20,
// -- Legacy Options
flag_geth: false,
flag_testnet: false,
@@ -1082,6 +1099,10 @@ mod tests {
ports_shift: Some(0),
unsafe_expose: Some(false),
}),
whisper: Some(Whisper {
enabled: Some(true),
pool_size: Some(50),
}),
stratum: None,
});
}

View File

@@ -425,6 +425,11 @@ Snapshot Options:
Virtual Machine Options:
--jitvm Enable the JIT VM. (default: {flag_jitvm})
Whisper Options:
--whisper Enable the Whisper network. (default: {flag_whisper})
--whisper-pool-size MB Target size of the whisper message pool in megabytes.
(default: {flag_whisper_pool_size})
Legacy Options:
--geth Run in Geth-compatibility mode. Sets the IPC path
to be the same as Geth's. Overrides the --ipc-path

View File

@@ -338,6 +338,8 @@ impl Configuration {
_ => (self.gas_pricer_config()?, self.miner_options(self.args.flag_reseal_min_period)?),
};
let whisper_config = self.whisper_config();
let run_cmd = RunCmd {
cache_config: cache_config,
dirs: dirs,
@@ -383,6 +385,7 @@ impl Configuration {
serve_light: !self.args.flag_no_serve_light,
light: self.args.flag_light,
no_persistent_txqueue: self.args.flag_no_persistent_txqueue,
whisper: whisper_config,
};
Cmd::Run(run_cmd)
};
@@ -1068,6 +1071,13 @@ impl Configuration {
settings
}
fn whisper_config(&self) -> ::whisper::Config {
::whisper::Config {
enabled: self.args.flag_whisper,
target_message_pool_size: self.args.flag_whisper_pool_size * 1024 * 1024,
}
}
}
#[cfg(test)]
@@ -1326,6 +1336,7 @@ mod tests {
serve_light: true,
light: false,
no_persistent_txqueue: false,
whisper: Default::default(),
};
expected.secretstore_conf.enabled = cfg!(feature = "secretstore");
assert_eq!(conf.into_command().unwrap().cmd, Cmd::Run(expected));

View File

@@ -63,6 +63,7 @@ extern crate parity_local_store as local_store;
extern crate parity_reactor;
extern crate parity_rpc;
extern crate parity_updater as updater;
extern crate parity_whisper;
extern crate path;
extern crate rpc_cli;
@@ -110,6 +111,7 @@ mod snapshot;
mod upgrade;
mod url;
mod user_defaults;
mod whisper;
#[cfg(feature="ipc")]
mod boot;

View File

@@ -19,7 +19,7 @@ use std::path::Path;
use ethcore::client::BlockChainClient;
use hypervisor::Hypervisor;
use ethsync::{SyncConfig, NetworkConfiguration, NetworkError, Params};
use ethsync::{AttachedProtocol, SyncConfig, NetworkConfiguration, NetworkError, Params};
use ethcore::snapshot::SnapshotService;
use light::Provider;
@@ -151,6 +151,7 @@ pub fn sync(
_snapshot_service: Arc<SnapshotService>,
_provider: Arc<Provider>,
log_settings: &LogConfig,
_attached_protos: Vec<AttachedProtocol>,
) -> Result<SyncModules, NetworkError> {
let mut hypervisor = hypervisor_ref.take().expect("There should be hypervisor for ipc configuration");
let args = sync_arguments(&hypervisor.io_path, sync_cfg, net_cfg, log_settings);
@@ -181,6 +182,7 @@ pub fn sync(
snapshot_service: Arc<SnapshotService>,
provider: Arc<Provider>,
_log_settings: &LogConfig,
attached_protos: Vec<AttachedProtocol>,
) -> Result<SyncModules, NetworkError> {
let eth_sync = EthSync::new(Params {
config: sync_cfg,
@@ -188,6 +190,7 @@ pub fn sync(
provider: provider,
snapshot_service: snapshot_service,
network_config: net_cfg,
attached_protos: attached_protos,
})?;
Ok((eth_sync.clone() as Arc<SyncProvider>, eth_sync.clone() as Arc<ManageNetwork>, eth_sync.clone() as Arc<ChainNotify>))

View File

@@ -32,7 +32,6 @@ pub use parity_rpc::{IpcServer, HttpServer, RequestMiddleware};
pub use parity_rpc::ws::Server as WsServer;
pub use parity_rpc::informant::CpuPool;
pub const DAPPS_DOMAIN: &'static str = "web3.site";
#[derive(Debug, Clone, PartialEq)]
@@ -168,7 +167,6 @@ impl Default for WsConfiguration {
}
}
impl WsConfiguration {
pub fn address(&self) -> Option<(String, u16)> {
match self.enabled {

View File

@@ -67,6 +67,12 @@ pub enum Api {
Rpc,
/// SecretStore (Safe)
SecretStore,
/// Whisper (Safe)
// TODO: _if_ someone guesses someone else's key or filter IDs they can remove
// BUT these are all ephemeral so it seems fine.
Whisper,
/// Whisper Pub-Sub (Safe but same concerns as above).
WhisperPubSub,
}
impl FromStr for Api {
@@ -89,6 +95,8 @@ impl FromStr for Api {
"traces" => Ok(Traces),
"rpc" => Ok(Rpc),
"secretstore" => Ok(SecretStore),
"shh" => Ok(Whisper),
"shh_pubsub" => Ok(WhisperPubSub),
api => Err(format!("Unknown api: {}", api))
}
}
@@ -172,6 +180,8 @@ fn to_modules(apis: &HashSet<Api>) -> BTreeMap<String, String> {
Api::Traces => ("traces", "1.0"),
Api::Rpc => ("rpc", "1.0"),
Api::SecretStore => ("secretstore", "1.0"),
Api::Whisper => ("shh", "1.0"),
Api::WhisperPubSub => ("shh_pubsub", "1.0"),
};
modules.insert(name.into(), version.into());
}
@@ -213,6 +223,7 @@ pub struct FullDependencies {
pub ws_address: Option<(String, u16)>,
pub fetch: FetchClient,
pub remote: parity_reactor::Remote,
pub whisper_rpc: Option<::whisper::RpcFactory>,
}
impl FullDependencies {
@@ -335,6 +346,18 @@ impl FullDependencies {
Api::SecretStore => {
handler.extend_with(SecretStoreClient::new(&self.secret_store).to_delegate());
},
Api::Whisper => {
if let Some(ref whisper_rpc) = self.whisper_rpc {
let whisper = whisper_rpc.make_handler();
handler.extend_with(::parity_whisper::rpc::Whisper::to_delegate(whisper));
}
}
Api::WhisperPubSub => {
if let Some(ref whisper_rpc) = self.whisper_rpc {
let whisper = whisper_rpc.make_handler();
handler.extend_with(::parity_whisper::rpc::WhisperPubSub::to_delegate(whisper));
}
}
}
}
}
@@ -383,6 +406,7 @@ pub struct LightDependencies {
pub fetch: FetchClient,
pub geth_compatibility: bool,
pub remote: parity_reactor::Remote,
pub whisper_rpc: Option<::whisper::RpcFactory>,
}
impl LightDependencies {
@@ -516,6 +540,18 @@ impl LightDependencies {
let secret_store = Some(self.secret_store.clone());
handler.extend_with(SecretStoreClient::new(&secret_store).to_delegate());
},
Api::Whisper => {
if let Some(ref whisper_rpc) = self.whisper_rpc {
let whisper = whisper_rpc.make_handler();
handler.extend_with(::parity_whisper::rpc::Whisper::to_delegate(whisper));
}
}
Api::WhisperPubSub => {
if let Some(ref whisper_rpc) = self.whisper_rpc {
let whisper = whisper_rpc.make_handler();
handler.extend_with(::parity_whisper::rpc::WhisperPubSub::to_delegate(whisper));
}
}
}
}
}
@@ -543,8 +579,17 @@ impl ApiSet {
pub fn list_apis(&self) -> HashSet<Api> {
let mut public_list = [
Api::Web3, Api::Net, Api::Eth, Api::EthPubSub, Api::Parity, Api::Rpc, Api::SecretStore,
Api::Web3,
Api::Net,
Api::Eth,
Api::EthPubSub,
Api::Parity,
Api::Rpc,
Api::SecretStore,
Api::Whisper,
Api::WhisperPubSub,
].into_iter().cloned().collect();
match *self {
ApiSet::List(ref apis) => apis.clone(),
ApiSet::PublicContext => public_list,
@@ -605,6 +650,8 @@ mod test {
assert_eq!(Api::Traces, "traces".parse().unwrap());
assert_eq!(Api::Rpc, "rpc".parse().unwrap());
assert_eq!(Api::SecretStore, "secretstore".parse().unwrap());
assert_eq!(Api::Whisper, "shh".parse().unwrap());
assert_eq!(Api::WhisperPubSub, "shh_pubsub".parse().unwrap());
assert!("rp".parse::<Api>().is_err());
}
@@ -622,7 +669,7 @@ mod test {
fn test_api_set_unsafe_context() {
let expected = vec![
// make sure this list contains only SAFE methods
Api::Web3, Api::Net, Api::Eth, Api::EthPubSub, Api::Parity, Api::ParityPubSub, Api::Traces, Api::Rpc, Api::SecretStore
Api::Web3, Api::Net, Api::Eth, Api::EthPubSub, Api::Parity, Api::ParityPubSub, Api::Traces, Api::Rpc, Api::SecretStore, Api::Whisper, Api::WhisperPubSub,
].into_iter().collect();
assert_eq!(ApiSet::UnsafeContext.list_apis(), expected);
}
@@ -631,7 +678,7 @@ mod test {
fn test_api_set_ipc_context() {
let expected = vec![
// safe
Api::Web3, Api::Net, Api::Eth, Api::EthPubSub, Api::Parity, Api::ParityPubSub, Api::Traces, Api::Rpc, Api::SecretStore,
Api::Web3, Api::Net, Api::Eth, Api::EthPubSub, Api::Parity, Api::ParityPubSub, Api::Traces, Api::Rpc, Api::SecretStore, Api::Whisper, Api::WhisperPubSub,
// semi-safe
Api::ParityAccounts
].into_iter().collect();
@@ -642,7 +689,7 @@ mod test {
fn test_api_set_safe_context() {
let expected = vec![
// safe
Api::Web3, Api::Net, Api::Eth, Api::EthPubSub, Api::Parity, Api::ParityPubSub, Api::Traces, Api::Rpc, Api::SecretStore,
Api::Web3, Api::Net, Api::Eth, Api::EthPubSub, Api::Parity, Api::ParityPubSub, Api::Traces, Api::Rpc, Api::SecretStore, Api::Whisper, Api::WhisperPubSub,
// semi-safe
Api::ParityAccounts,
// Unsafe
@@ -654,7 +701,7 @@ mod test {
#[test]
fn test_all_apis() {
assert_eq!("all".parse::<ApiSet>().unwrap(), ApiSet::List(vec![
Api::Web3, Api::Net, Api::Eth, Api::EthPubSub, Api::Parity, Api::ParityPubSub, Api::Traces, Api::Rpc, Api::SecretStore,
Api::Web3, Api::Net, Api::Eth, Api::EthPubSub, Api::Parity, Api::ParityPubSub, Api::Traces, Api::Rpc, Api::SecretStore, Api::Whisper, Api::WhisperPubSub,
Api::ParityAccounts,
Api::ParitySet, Api::Signer,
Api::Personal
@@ -664,7 +711,7 @@ mod test {
#[test]
fn test_all_without_personal_apis() {
assert_eq!("personal,all,-personal".parse::<ApiSet>().unwrap(), ApiSet::List(vec![
Api::Web3, Api::Net, Api::Eth, Api::EthPubSub, Api::Parity, Api::ParityPubSub, Api::Traces, Api::Rpc, Api::SecretStore,
Api::Web3, Api::Net, Api::Eth, Api::EthPubSub, Api::Parity, Api::ParityPubSub, Api::Traces, Api::Rpc, Api::SecretStore, Api::Whisper, Api::WhisperPubSub,
Api::ParityAccounts,
Api::ParitySet, Api::Signer,
].into_iter().collect()));
@@ -673,7 +720,7 @@ mod test {
#[test]
fn test_safe_parsing() {
assert_eq!("safe".parse::<ApiSet>().unwrap(), ApiSet::List(vec![
Api::Web3, Api::Net, Api::Eth, Api::EthPubSub, Api::Parity, Api::ParityPubSub, Api::Traces, Api::Rpc, Api::SecretStore,
Api::Web3, Api::Net, Api::Eth, Api::EthPubSub, Api::Parity, Api::ParityPubSub, Api::Traces, Api::Rpc, Api::SecretStore, Api::Whisper, Api::WhisperPubSub,
].into_iter().collect()));
}
}

View File

@@ -115,6 +115,7 @@ pub struct RunCmd {
pub serve_light: bool,
pub light: bool,
pub no_persistent_txqueue: bool,
pub whisper: ::whisper::Config
}
pub fn open_ui(ws_conf: &rpc::WsConfiguration, ui_conf: &rpc::UiConfiguration) -> Result<(), String> {
@@ -230,6 +231,17 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) ->
// start on_demand service.
let on_demand = Arc::new(::light::on_demand::OnDemand::new(cache.clone()));
let mut attached_protos = Vec::new();
let whisper_factory = if cmd.whisper.enabled {
let (whisper_net, whisper_factory) = ::whisper::setup(cmd.whisper.target_message_pool_size)
.map_err(|e| format!("Failed to initialize whisper: {}", e))?;
attached_protos.push(whisper_net);
whisper_factory
} else {
None
};
// set network path.
net_conf.net_config_path = Some(db_dirs.network_path().to_string_lossy().into_owned());
let sync_params = LightSyncParams {
@@ -238,6 +250,7 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) ->
network_id: cmd.network_id.unwrap_or(spec.network_id()),
subprotocol_name: ethsync::LIGHT_PROTOCOL,
handlers: vec![on_demand.clone()],
attached_protos: attached_protos,
};
let light_sync = LightSync::new(sync_params).map_err(|e| format!("Error starting network: {}", e))?;
let light_sync = Arc::new(light_sync);
@@ -318,6 +331,7 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) ->
fetch: fetch,
geth_compatibility: cmd.geth_compatibility,
remote: event_loop.remote(),
whisper_rpc: whisper_factory,
});
let dependencies = rpc::Dependencies {
@@ -589,6 +603,18 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> R
.map_err(|e| format!("Stratum start error: {:?}", e))?;
}
let mut attached_protos = Vec::new();
let whisper_factory = if cmd.whisper.enabled {
let (whisper_net, whisper_factory) = ::whisper::setup(cmd.whisper.target_message_pool_size)
.map_err(|e| format!("Failed to initialize whisper: {}", e))?;
attached_protos.push(whisper_net);
whisper_factory
} else {
None
};
// create sync object
let (sync_provider, manage_network, chain_notify) = modules::sync(
&mut hypervisor,
@@ -598,6 +624,7 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> R
snapshot_service.clone(),
client.clone(),
&cmd.logger_config,
attached_protos,
).map_err(|e| format!("Sync error: {}", e))?;
service.add_notify(chain_notify.clone());
@@ -681,6 +708,7 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> R
ws_address: cmd.ws_conf.address(),
fetch: fetch.clone(),
remote: event_loop.remote(),
whisper_rpc: whisper_factory,
});
let dependencies = rpc::Dependencies {

View File

@@ -52,11 +52,12 @@ pub fn main() {
let remote_provider = dependency!(LightProviderClient, &service_urls::with_base(&service_config.io_path, service_urls::LIGHT_PROVIDER));
let sync = EthSync::new(Params {
config: service_config.sync,
chain: remote_client.service().clone(),
snapshot_service: remote_snapshot.service().clone(),
config: service_config.sync,
chain: remote_client.service().clone(),
snapshot_service: remote_snapshot.service().clone(),
provider: remote_provider.service().clone(),
network_config: service_config.net
attached_protos: Vec::new(),
}).unwrap();
let _ = boot::main_thread();

77
parity/whisper.rs Normal file
View File

@@ -0,0 +1,77 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::sync::Arc;
use std::io;
use ethsync::AttachedProtocol;
use parity_rpc::Metadata;
use parity_whisper::net::{self as whisper_net, PoolHandle, Network as WhisperNetwork};
use parity_whisper::rpc::{WhisperClient, FilterManager};
/// Whisper config.
#[derive(Debug, PartialEq, Eq)]
pub struct Config {
pub enabled: bool,
pub target_message_pool_size: usize,
}
impl Default for Config {
fn default() -> Self {
Config {
enabled: false,
target_message_pool_size: 10 * 1024 * 1024,
}
}
}
/// Factory for standard whisper RPC.
pub struct RpcFactory {
net: Arc<WhisperNetwork<Arc<FilterManager>>>,
manager: Arc<FilterManager>,
}
impl RpcFactory {
pub fn make_handler(&self) -> WhisperClient<PoolHandle, Metadata> {
WhisperClient::new(self.net.handle(), self.manager.clone())
}
}
/// Sets up whisper protocol and RPC handler.
///
/// Will target the given pool size.
#[cfg(not(feature = "ipc"))]
pub fn setup(target_pool_size: usize) -> io::Result<(AttachedProtocol, Option<RpcFactory>)> {
let manager = Arc::new(FilterManager::new()?);
let net = Arc::new(WhisperNetwork::new(target_pool_size, manager.clone()));
let proto = AttachedProtocol {
handler: net.clone() as Arc<_>,
packet_count: whisper_net::PACKET_COUNT,
versions: whisper_net::SUPPORTED_VERSIONS,
protocol_id: *b"shh",
};
let factory = RpcFactory { net: net, manager: manager };
Ok((proto, Some(factory)))
}
// TODO: make it possible to attach generic protocols in IPC.
#[cfg(feature = "ipc")]
pub fn setup(_pool: usize) -> (AttachedProtocol, Option<RpcFactory>) {
Ok((AttachedProtocol, None))
}